1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.util.NavigableMap;
23  import java.util.TreeMap;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.ScheduledExecutorService;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  
29  import com.google.common.util.concurrent.ThreadFactoryBuilder;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.HTableDescriptor;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.Server;
40  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
41  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
42  import org.apache.hadoop.hbase.regionserver.wal.HLog;
43  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
44  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
45  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
46  import org.apache.hadoop.hbase.replication.ReplicationQueues;
47  import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
48  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
49  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.zookeeper.KeeperException;
52  
53  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
54  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
55  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
56  
57  /**
58   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
59   */
60  @InterfaceAudience.Private
61  public class Replication implements WALActionsListener, 
62    ReplicationSourceService, ReplicationSinkService {
63    private static final Log LOG =
64        LogFactory.getLog(Replication.class);
65    private boolean replication;
66    private ReplicationSourceManager replicationManager;
67    private final AtomicBoolean replicating = new AtomicBoolean(true);
68    private ReplicationZookeeper zkHelper;
69    private ReplicationQueues replicationQueues;
70    private Configuration conf;
71    private ReplicationSink replicationSink;
72    // Hosting server
73    private Server server;
74    /** Statistics thread schedule pool */
75    private ScheduledExecutorService scheduleThreadPool;
76    private int statsThreadPeriod;
77  
78    /**
79     * Instantiate the replication management (if rep is enabled).
80     * @param server Hosting server
81     * @param fs handle to the filesystem
82     * @param logDir
83     * @param oldLogDir directory where logs are archived
84     * @throws IOException
85     */
86    public Replication(final Server server, final FileSystem fs,
87        final Path logDir, final Path oldLogDir) throws IOException{
88      initialize(server, fs, logDir, oldLogDir);
89    }
90  
91    /**
92     * Empty constructor
93     */
94    public Replication() {
95    }
96  
97    public void initialize(final Server server, final FileSystem fs,
98        final Path logDir, final Path oldLogDir) throws IOException {
99      this.server = server;
100     this.conf = this.server.getConfiguration();
101     this.replication = isReplication(this.conf);
102     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
103       new ThreadFactoryBuilder()
104         .setNameFormat(server.getServerName() + "Replication Statistics #%d")
105         .setDaemon(true)
106         .build());
107     if (replication) {
108       try {
109         this.zkHelper = new ReplicationZookeeper(server, this.replicating);
110         this.replicationQueues =
111             new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server);
112         this.replicationQueues.init(this.server.getServerName().toString());
113       } catch (KeeperException ke) {
114         throw new IOException("Failed replication handler create " +
115            "(replicating=" + this.replicating, ke);
116       }
117       this.replicationManager =
118           new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs,
119               this.replicating, logDir, oldLogDir);
120       this.statsThreadPeriod =
121           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
122       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
123     } else {
124       this.replicationManager = null;
125       this.zkHelper = null;
126       this.replicationQueues = null;
127     }
128   }
129 
130    /**
131     * @param c Configuration to look at
132     * @return True if replication is enabled.
133     */
134   public static boolean isReplication(final Configuration c) {
135     return c.getBoolean(REPLICATION_ENABLE_KEY, false);
136   }
137 
138    /*
139     * Returns an object to listen to new hlog changes
140     **/
141   public WALActionsListener getWALActionsListener() {
142     return this;
143   }
144   /**
145    * Stops replication service.
146    */
147   public void stopReplicationService() {
148     join();
149   }
150 
151   /**
152    * Join with the replication threads
153    */
154   public void join() {
155     if (this.replication) {
156       this.replicationManager.join();
157       if (this.replicationSink != null) {
158         this.replicationSink.stopReplicationSinkServices();
159       }
160     }
161   }
162 
163   /**
164    * Carry on the list of log entries down to the sink
165    * @param entries list of entries to replicate
166    * @throws IOException
167    */
168   public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
169     if (this.replication) {
170       this.replicationSink.replicateEntries(entries);
171     }
172   }
173 
174   /**
175    * If replication is enabled and this cluster is a master,
176    * it starts
177    * @throws IOException
178    */
179   public void startReplicationService() throws IOException {
180     if (this.replication) {
181       this.replicationManager.init();
182       this.replicationSink = new ReplicationSink(this.conf, this.server);
183       this.scheduleThreadPool.scheduleAtFixedRate(
184         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
185         statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
186     }
187   }
188 
189   /**
190    * Get the replication sources manager
191    * @return the manager if replication is enabled, else returns false
192    */
193   public ReplicationSourceManager getReplicationManager() {
194     return this.replicationManager;
195   }
196 
197   @Override
198   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
199       WALEdit logEdit) {
200     // Not interested
201   }
202 
203   @Override
204   public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
205                                        WALEdit logEdit) {
206     NavigableMap<byte[], Integer> scopes =
207         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
208     byte[] family;
209     for (KeyValue kv : logEdit.getKeyValues()) {
210       family = kv.getFamily();
211       int scope = htd.getFamily(family).getScope();
212       if (scope != REPLICATION_SCOPE_LOCAL &&
213           !scopes.containsKey(family)) {
214         scopes.put(family, scope);
215       }
216     }
217     if (!scopes.isEmpty()) {
218       logKey.setScopes(scopes);
219     }
220   }
221 
222   @Override
223   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
224     getReplicationManager().preLogRoll(newPath);
225   }
226 
227   @Override
228   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
229     getReplicationManager().postLogRoll(newPath);
230   }
231 
232   @Override
233   public void preLogArchive(Path oldPath, Path newPath) throws IOException {
234     // Not interested
235   }
236 
237   @Override
238   public void postLogArchive(Path oldPath, Path newPath) throws IOException {
239     // Not interested
240   }
241 
242   /**
243    * This method modifies the master's configuration in order to inject
244    * replication-related features
245    * @param conf
246    */
247   public static void decorateMasterConfiguration(Configuration conf) {
248     if (!isReplication(conf)) {
249       return;
250     }
251     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
252     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
253     if (!plugins.contains(cleanerClass)) {
254       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
255     }
256   }
257 
258   @Override
259   public void logRollRequested() {
260     // Not interested
261   }
262 
263   @Override
264   public void logCloseRequested() {
265     // not interested
266   }
267 
268   /*
269    * Statistics thread. Periodically prints the cache statistics to the log.
270    */
271   static class ReplicationStatisticsThread extends Thread {
272 
273     private final ReplicationSink replicationSink;
274     private final ReplicationSourceManager replicationManager;
275 
276     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
277                             final ReplicationSourceManager replicationManager) {
278       super("ReplicationStatisticsThread");
279       this.replicationManager = replicationManager;
280       this.replicationSink = replicationSink;
281     }
282 
283     @Override
284     public void run() {
285       printStats(this.replicationManager.getStats());
286       printStats(this.replicationSink.getStats());
287     }
288 
289     private void printStats(String stats) {
290       if (!stats.isEmpty()) {
291         LOG.info(stats);
292       }
293     }
294   }
295 }