1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.util.NavigableMap;
23 import java.util.TreeMap;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28
29 import com.google.common.util.concurrent.ThreadFactoryBuilder;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.HRegionInfo;
37 import org.apache.hadoop.hbase.HTableDescriptor;
38 import org.apache.hadoop.hbase.KeyValue;
39 import org.apache.hadoop.hbase.Server;
40 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
41 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
42 import org.apache.hadoop.hbase.regionserver.wal.HLog;
43 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
44 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
45 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
46 import org.apache.hadoop.hbase.replication.ReplicationQueues;
47 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
48 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
49 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.zookeeper.KeeperException;
52
53 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
54 import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
55 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
56
57
58
59
60 @InterfaceAudience.Private
61 public class Replication implements WALActionsListener,
62 ReplicationSourceService, ReplicationSinkService {
63 private static final Log LOG =
64 LogFactory.getLog(Replication.class);
65 private boolean replication;
66 private ReplicationSourceManager replicationManager;
67 private final AtomicBoolean replicating = new AtomicBoolean(true);
68 private ReplicationZookeeper zkHelper;
69 private ReplicationQueues replicationQueues;
70 private Configuration conf;
71 private ReplicationSink replicationSink;
72
73 private Server server;
74
75 private ScheduledExecutorService scheduleThreadPool;
76 private int statsThreadPeriod;
77
78
79
80
81
82
83
84
85
86 public Replication(final Server server, final FileSystem fs,
87 final Path logDir, final Path oldLogDir) throws IOException{
88 initialize(server, fs, logDir, oldLogDir);
89 }
90
91
92
93
94 public Replication() {
95 }
96
97 public void initialize(final Server server, final FileSystem fs,
98 final Path logDir, final Path oldLogDir) throws IOException {
99 this.server = server;
100 this.conf = this.server.getConfiguration();
101 this.replication = isReplication(this.conf);
102 this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
103 new ThreadFactoryBuilder()
104 .setNameFormat(server.getServerName() + "Replication Statistics #%d")
105 .setDaemon(true)
106 .build());
107 if (replication) {
108 try {
109 this.zkHelper = new ReplicationZookeeper(server, this.replicating);
110 this.replicationQueues =
111 new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server);
112 this.replicationQueues.init(this.server.getServerName().toString());
113 } catch (KeeperException ke) {
114 throw new IOException("Failed replication handler create " +
115 "(replicating=" + this.replicating, ke);
116 }
117 this.replicationManager =
118 new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs,
119 this.replicating, logDir, oldLogDir);
120 this.statsThreadPeriod =
121 this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
122 LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
123 } else {
124 this.replicationManager = null;
125 this.zkHelper = null;
126 this.replicationQueues = null;
127 }
128 }
129
130
131
132
133
134 public static boolean isReplication(final Configuration c) {
135 return c.getBoolean(REPLICATION_ENABLE_KEY, false);
136 }
137
138
139
140
141 public WALActionsListener getWALActionsListener() {
142 return this;
143 }
144
145
146
147 public void stopReplicationService() {
148 join();
149 }
150
151
152
153
154 public void join() {
155 if (this.replication) {
156 this.replicationManager.join();
157 if (this.replicationSink != null) {
158 this.replicationSink.stopReplicationSinkServices();
159 }
160 }
161 }
162
163
164
165
166
167
168 public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
169 if (this.replication) {
170 this.replicationSink.replicateEntries(entries);
171 }
172 }
173
174
175
176
177
178
179 public void startReplicationService() throws IOException {
180 if (this.replication) {
181 this.replicationManager.init();
182 this.replicationSink = new ReplicationSink(this.conf, this.server);
183 this.scheduleThreadPool.scheduleAtFixedRate(
184 new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
185 statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
186 }
187 }
188
189
190
191
192
193 public ReplicationSourceManager getReplicationManager() {
194 return this.replicationManager;
195 }
196
197 @Override
198 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
199 WALEdit logEdit) {
200
201 }
202
203 @Override
204 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
205 WALEdit logEdit) {
206 NavigableMap<byte[], Integer> scopes =
207 new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
208 byte[] family;
209 for (KeyValue kv : logEdit.getKeyValues()) {
210 family = kv.getFamily();
211 int scope = htd.getFamily(family).getScope();
212 if (scope != REPLICATION_SCOPE_LOCAL &&
213 !scopes.containsKey(family)) {
214 scopes.put(family, scope);
215 }
216 }
217 if (!scopes.isEmpty()) {
218 logKey.setScopes(scopes);
219 }
220 }
221
222 @Override
223 public void preLogRoll(Path oldPath, Path newPath) throws IOException {
224 getReplicationManager().preLogRoll(newPath);
225 }
226
227 @Override
228 public void postLogRoll(Path oldPath, Path newPath) throws IOException {
229 getReplicationManager().postLogRoll(newPath);
230 }
231
232 @Override
233 public void preLogArchive(Path oldPath, Path newPath) throws IOException {
234
235 }
236
237 @Override
238 public void postLogArchive(Path oldPath, Path newPath) throws IOException {
239
240 }
241
242
243
244
245
246
247 public static void decorateMasterConfiguration(Configuration conf) {
248 if (!isReplication(conf)) {
249 return;
250 }
251 String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
252 String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
253 if (!plugins.contains(cleanerClass)) {
254 conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
255 }
256 }
257
258 @Override
259 public void logRollRequested() {
260
261 }
262
263 @Override
264 public void logCloseRequested() {
265
266 }
267
268
269
270
271 static class ReplicationStatisticsThread extends Thread {
272
273 private final ReplicationSink replicationSink;
274 private final ReplicationSourceManager replicationManager;
275
276 public ReplicationStatisticsThread(final ReplicationSink replicationSink,
277 final ReplicationSourceManager replicationManager) {
278 super("ReplicationStatisticsThread");
279 this.replicationManager = replicationManager;
280 this.replicationSink = replicationSink;
281 }
282
283 @Override
284 public void run() {
285 printStats(this.replicationManager.getStats());
286 printStats(this.replicationSink.getStats());
287 }
288
289 private void printStats(String stats) {
290 if (!stats.isEmpty()) {
291 LOG.info(stats);
292 }
293 }
294 }
295 }