1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22 import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
23 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.NavigableMap;
29 import java.util.TreeMap;
30 import java.util.UUID;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.Cell;
42 import org.apache.hadoop.hbase.CellScanner;
43 import org.apache.hadoop.hbase.CellUtil;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.Server;
48 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
49 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
50 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
51 import org.apache.hadoop.hbase.wal.WALKey;
52 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
53 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54 import org.apache.hadoop.hbase.replication.ReplicationException;
55 import org.apache.hadoop.hbase.replication.ReplicationFactory;
56 import org.apache.hadoop.hbase.replication.ReplicationPeers;
57 import org.apache.hadoop.hbase.replication.ReplicationQueues;
58 import org.apache.hadoop.hbase.replication.ReplicationTracker;
59 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
60 import org.apache.hadoop.hbase.util.Bytes;
61 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
62 import org.apache.zookeeper.KeeperException;
63
64 import com.google.common.util.concurrent.ThreadFactoryBuilder;
65
66
67
68
69 @InterfaceAudience.Private
70 public class Replication extends WALActionsListener.Base implements
71 ReplicationSourceService, ReplicationSinkService {
72 private static final Log LOG =
73 LogFactory.getLog(Replication.class);
74 private boolean replication;
75 private ReplicationSourceManager replicationManager;
76 private ReplicationQueues replicationQueues;
77 private ReplicationPeers replicationPeers;
78 private ReplicationTracker replicationTracker;
79 private Configuration conf;
80 private ReplicationSink replicationSink;
81
82 private Server server;
83
84 private ScheduledExecutorService scheduleThreadPool;
85 private int statsThreadPeriod;
86
87 private ReplicationLoad replicationLoad;
88
89
90
91
92
93
94
95
96
97 public Replication(final Server server, final FileSystem fs,
98 final Path logDir, final Path oldLogDir) throws IOException{
99 initialize(server, fs, logDir, oldLogDir);
100 }
101
102
103
104
105 public Replication() {
106 }
107
108 public void initialize(final Server server, final FileSystem fs,
109 final Path logDir, final Path oldLogDir) throws IOException {
110 this.server = server;
111 this.conf = this.server.getConfiguration();
112 this.replication = isReplication(this.conf);
113 this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
114 new ThreadFactoryBuilder()
115 .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
116 .setDaemon(true)
117 .build());
118 if (replication) {
119 try {
120 this.replicationQueues =
121 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
122 this.replicationQueues.init(this.server.getServerName().toString());
123 this.replicationPeers =
124 ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
125 this.replicationPeers.init();
126 this.replicationTracker =
127 ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
128 this.conf, this.server, this.server);
129 } catch (ReplicationException e) {
130 throw new IOException("Failed replication handler create", e);
131 }
132 UUID clusterId = null;
133 try {
134 clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
135 } catch (KeeperException ke) {
136 throw new IOException("Could not read cluster id", ke);
137 }
138 this.replicationManager =
139 new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
140 conf, this.server, fs, logDir, oldLogDir, clusterId);
141 this.statsThreadPeriod =
142 this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
143 LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
144 this.replicationLoad = new ReplicationLoad();
145 } else {
146 this.replicationManager = null;
147 this.replicationQueues = null;
148 this.replicationPeers = null;
149 this.replicationTracker = null;
150 this.replicationLoad = null;
151 }
152 }
153
154
155
156
157
158 public static boolean isReplication(final Configuration c) {
159 return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
160 }
161
162
163
164
165 public WALActionsListener getWALActionsListener() {
166 return this;
167 }
168
169
170
171 public void stopReplicationService() {
172 join();
173 }
174
175
176
177
178 public void join() {
179 if (this.replication) {
180 this.replicationManager.join();
181 if (this.replicationSink != null) {
182 this.replicationSink.stopReplicationSinkServices();
183 }
184 }
185 scheduleThreadPool.shutdown();
186 }
187
188
189
190
191
192
193
194
195
196 public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
197 if (this.replication) {
198 this.replicationSink.replicateEntries(entries, cells);
199 }
200 }
201
202
203
204
205
206
207 public void startReplicationService() throws IOException {
208 if (this.replication) {
209 try {
210 this.replicationManager.init();
211 } catch (ReplicationException e) {
212 throw new IOException(e);
213 }
214 this.replicationSink = new ReplicationSink(this.conf, this.server);
215 this.scheduleThreadPool.scheduleAtFixedRate(
216 new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
217 statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
218 }
219 }
220
221
222
223
224
225 public ReplicationSourceManager getReplicationManager() {
226 return this.replicationManager;
227 }
228
229 @Override
230 public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
231 WALEdit logEdit) {
232 scopeWALEdits(htd, logKey, logEdit);
233 }
234
235
236
237
238
239
240
241
242 public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
243 WALEdit logEdit) {
244 NavigableMap<byte[], Integer> scopes =
245 new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
246 byte[] family;
247 for (Cell cell : logEdit.getCells()) {
248 family = cell.getFamily();
249
250 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
251
252 assert htd.getFamily(family) != null;
253
254 int scope = htd.getFamily(family).getScope();
255 if (scope != REPLICATION_SCOPE_LOCAL &&
256 !scopes.containsKey(family)) {
257 scopes.put(family, scope);
258 }
259 }
260 if (!scopes.isEmpty()) {
261 logKey.setScopes(scopes);
262 }
263 }
264
265 @Override
266 public void preLogRoll(Path oldPath, Path newPath) throws IOException {
267 getReplicationManager().preLogRoll(newPath);
268 }
269
270 @Override
271 public void postLogRoll(Path oldPath, Path newPath) throws IOException {
272 getReplicationManager().postLogRoll(newPath);
273 }
274
275
276
277
278
279
280 public static void decorateMasterConfiguration(Configuration conf) {
281 if (!isReplication(conf)) {
282 return;
283 }
284 String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
285 String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
286 if (!plugins.contains(cleanerClass)) {
287 conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
288 }
289 }
290
291
292
293
294 static class ReplicationStatisticsThread extends Thread {
295
296 private final ReplicationSink replicationSink;
297 private final ReplicationSourceManager replicationManager;
298
299 public ReplicationStatisticsThread(final ReplicationSink replicationSink,
300 final ReplicationSourceManager replicationManager) {
301 super("ReplicationStatisticsThread");
302 this.replicationManager = replicationManager;
303 this.replicationSink = replicationSink;
304 }
305
306 @Override
307 public void run() {
308 printStats(this.replicationManager.getStats());
309 printStats(this.replicationSink.getStats());
310 }
311
312 private void printStats(String stats) {
313 if (!stats.isEmpty()) {
314 LOG.info(stats);
315 }
316 }
317 }
318
319 @Override
320 public ReplicationLoad refreshAndGetReplicationLoad() {
321 if (this.replicationLoad == null) {
322 return null;
323 }
324
325 buildReplicationLoad();
326 return this.replicationLoad;
327 }
328
329 private void buildReplicationLoad() {
330 List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
331
332
333 List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
334 for (ReplicationSourceInterface source : sources) {
335 if (source instanceof ReplicationSource) {
336 sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
337 }
338 }
339
340
341 List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
342 for (ReplicationSourceInterface source : oldSources) {
343 if (source instanceof ReplicationSource) {
344 sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
345 }
346 }
347
348
349 MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
350 this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
351 }
352 }