1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.TreeMap;
30 import java.util.UUID;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.commons.lang.StringUtils;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.CellScanner;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.Stoppable;
45 import org.apache.hadoop.hbase.client.Connection;
46 import org.apache.hadoop.hbase.client.ConnectionFactory;
47 import org.apache.hadoop.hbase.client.Delete;
48 import org.apache.hadoop.hbase.client.Mutation;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.client.Row;
51 import org.apache.hadoop.hbase.client.Table;
52 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
53 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 @InterfaceAudience.Private
71 public class ReplicationSink {
72
73 private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
74 private final Configuration conf;
75
76
77 private volatile Connection sharedHtableCon;
78 private final MetricsSink metrics;
79 private final AtomicLong totalReplicatedEdits = new AtomicLong();
80 private final Object sharedHtableConLock = new Object();
81
82
83
84
85
86
87
88
89 public ReplicationSink(Configuration conf, Stoppable stopper)
90 throws IOException {
91 this.conf = HBaseConfiguration.create(conf);
92 decorateConf();
93 this.metrics = new MetricsSink();
94 }
95
96
97
98
99
100 private void decorateConf() {
101 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
102 this.conf.getInt("replication.sink.client.retries.number", 4));
103 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
104 this.conf.getInt("replication.sink.client.ops.timeout", 10000));
105 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
106 if (StringUtils.isNotEmpty(replicationCodec)) {
107 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
108 }
109 }
110
111
112
113
114
115
116
117
118 public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException {
119 if (entries.isEmpty()) return;
120 if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
121
122
123 try {
124 long totalReplicated = 0;
125
126
127 Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
128 new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
129 for (WALEntry entry : entries) {
130 TableName table =
131 TableName.valueOf(entry.getKey().getTableName().toByteArray());
132 Cell previousCell = null;
133 Mutation m = null;
134 int count = entry.getAssociatedCellCount();
135 for (int i = 0; i < count; i++) {
136
137 if (!cells.advance()) {
138 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
139 }
140 Cell cell = cells.current();
141 if (isNewRowOrType(previousCell, cell)) {
142
143 m = CellUtil.isDelete(cell)?
144 new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
145 new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
146 List<UUID> clusterIds = new ArrayList<UUID>();
147 for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
148 clusterIds.add(toUUID(clusterId));
149 }
150 m.setClusterIds(clusterIds);
151 addToHashMultiMap(rowMap, table, clusterIds, m);
152 }
153 if (CellUtil.isDelete(cell)) {
154 ((Delete)m).addDeleteMarker(cell);
155 } else {
156 ((Put)m).add(cell);
157 }
158 previousCell = cell;
159 }
160 totalReplicated++;
161 }
162 for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
163 batch(entry.getKey(), entry.getValue().values());
164 }
165 int size = entries.size();
166 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
167 this.metrics.applyBatch(size);
168 this.totalReplicatedEdits.addAndGet(totalReplicated);
169 } catch (IOException ex) {
170 LOG.error("Unable to accept edit because:", ex);
171 throw ex;
172 }
173 }
174
175
176
177
178
179
180 private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
181 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
182 !CellUtil.matchingRow(previousCell, cell);
183 }
184
185 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
186 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
187 }
188
189
190
191
192
193
194
195
196
197
198 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
199 Map<K2,List<V>> innerMap = map.get(key1);
200 if (innerMap == null) {
201 innerMap = new HashMap<K2, List<V>>();
202 map.put(key1, innerMap);
203 }
204 List<V> values = innerMap.get(key2);
205 if (values == null) {
206 values = new ArrayList<V>();
207 innerMap.put(key2, values);
208 }
209 values.add(value);
210 return values;
211 }
212
213
214
215
216 public void stopReplicationSinkServices() {
217 try {
218 if (this.sharedHtableCon != null) {
219 synchronized (sharedHtableConLock) {
220 if (this.sharedHtableCon != null) {
221 this.sharedHtableCon.close();
222 this.sharedHtableCon = null;
223 }
224 }
225 }
226 } catch (IOException e) {
227 LOG.warn("IOException while closing the connection", e);
228 }
229 }
230
231
232
233
234
235
236
237
238 protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
239 if (allRows.isEmpty()) {
240 return;
241 }
242 Table table = null;
243 try {
244
245 Connection connection = this.sharedHtableCon;
246 if (connection == null) {
247 synchronized (sharedHtableConLock) {
248 connection = this.sharedHtableCon;
249 if (connection == null) {
250 connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
251 }
252 }
253 }
254 table = connection.getTable(tableName);
255 for (List<Row> rows : allRows) {
256 table.batch(rows);
257 }
258 } catch (InterruptedException ix) {
259 throw (InterruptedIOException)new InterruptedIOException().initCause(ix);
260 } finally {
261 if (table != null) {
262 table.close();
263 }
264 }
265 }
266
267
268
269
270
271
272 public String getStats() {
273 return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
274 "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
275 ", total replicated edits: " + this.totalReplicatedEdits;
276 }
277
278
279
280
281
282 public MetricsSink getSinkMetrics() {
283 return this.metrics;
284 }
285 }