View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.NavigableMap;
24  import java.util.TreeMap;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.Server;
34  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
35  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
36  import org.apache.hadoop.hbase.regionserver.wal.HLog;
37  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
38  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
39  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
40  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
41  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.zookeeper.KeeperException;
44  
45  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
46  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
47  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
48  
49  /**
50   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
51   */
52  public class Replication implements WALActionsListener, 
53    ReplicationSourceService, ReplicationSinkService {
54    private boolean replication;
55    private ReplicationSourceManager replicationManager;
56    private final AtomicBoolean replicating = new AtomicBoolean(true);
57    private ReplicationZookeeper zkHelper;
58    private Configuration conf;
59    private ReplicationSink replicationSink;
60    // Hosting server
61    private Server server;
62  
63    /**
64     * Instantiate the replication management (if rep is enabled).
65     * @param server Hosting server
66     * @param fs handle to the filesystem
67     * @param logDir
68     * @param oldLogDir directory where logs are archived
69     * @throws IOException
70     */
71    public Replication(final Server server, final FileSystem fs,
72        final Path logDir, final Path oldLogDir) throws IOException{
73      initialize(server, fs, logDir, oldLogDir);
74    }
75  
76    /**
77     * Empty constructor
78     */
79    public Replication() {
80    }
81  
82    public void initialize(final Server server, final FileSystem fs,
83        final Path logDir, final Path oldLogDir) throws IOException {
84      this.server = server;
85      this.conf = this.server.getConfiguration();
86      this.replication = isReplication(this.conf);
87      if (replication) {
88        try {
89          this.zkHelper = new ReplicationZookeeper(server, this.replicating);
90        } catch (KeeperException ke) {
91          throw new IOException("Failed replication handler create " +
92             "(replicating=" + this.replicating, ke);
93        }
94        this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
95            this.server, fs, this.replicating, logDir, oldLogDir) ;
96      } else {
97        this.replicationManager = null;
98        this.zkHelper = null;
99      }
100   }
101 
102    /**
103     * @param c Configuration to look at
104     * @return True if replication is enabled.
105     */
106   public static boolean isReplication(final Configuration c) {
107     return c.getBoolean(REPLICATION_ENABLE_KEY, false);
108   }
109 
110    /*
111     * Returns an object to listen to new hlog changes
112     **/
113   public WALActionsListener getWALActionsListener() {
114     return this;
115   }
116   /**
117    * Stops replication service.
118    */
119   public void stopReplicationService() {
120     join();
121   }
122 
123   /**
124    * Join with the replication threads
125    */
126   public void join() {
127     if (this.replication) {
128       this.replicationManager.join();
129       if (this.replicationSink != null) {
130         this.replicationSink.stopReplicationSinkServices();
131       }
132     }
133   }
134 
135   /**
136    * Carry on the list of log entries down to the sink
137    * @param entries list of entries to replicate
138    * @throws IOException
139    */
140   public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
141     if (this.replication) {
142       this.replicationSink.replicateEntries(entries);
143     }
144   }
145 
146   /**
147    * If replication is enabled and this cluster is a master,
148    * it starts
149    * @throws IOException
150    */
151   public void startReplicationService() throws IOException {
152     if (this.replication) {
153       this.replicationManager.init();
154       this.replicationSink = new ReplicationSink(this.conf, this.server);
155     }
156   }
157 
158   /**
159    * Get the replication sources manager
160    * @return the manager if replication is enabled, else returns false
161    */
162   public ReplicationSourceManager getReplicationManager() {
163     return this.replicationManager;
164   }
165 
166   @Override
167   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
168       WALEdit logEdit) {
169     // Not interested
170   }
171 
172   @Override
173   public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
174                                        WALEdit logEdit) {
175     byte[] family;
176     for (KeyValue kv : logEdit.getKeyValues()) {
177       family = kv.getFamily();
178       int scope = htd.getFamily(family).getScope();
179       if (scope != REPLICATION_SCOPE_LOCAL &&
180           !logEdit.hasKeyInScope(family)) {
181         logEdit.putIntoScope(family, scope);
182       }
183     }
184   }
185 
186   @Override
187   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
188     getReplicationManager().preLogRoll(newPath);
189   }
190 
191   @Override
192   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
193     getReplicationManager().postLogRoll(newPath);
194   }
195 
196   @Override
197   public void preLogArchive(Path oldPath, Path newPath) throws IOException {
198     // Not interested
199   }
200 
201   @Override
202   public void postLogArchive(Path oldPath, Path newPath) throws IOException {
203     // Not interested
204   }
205 
206   /**
207    * This method modifies the master's configuration in order to inject
208    * replication-related features
209    * @param conf
210    */
211   public static void decorateMasterConfiguration(Configuration conf) {
212     if (!isReplication(conf)) {
213       return;
214     }
215     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
216     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
217     if (!plugins.contains(cleanerClass)) {
218       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
219     }
220   }
221 
222   @Override
223   public void logRollRequested() {
224     // Not interested
225   }
226 
227   @Override
228   public void logCloseRequested() {
229     // not interested
230   }
231 }