001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.Server;
033import org.apache.hadoop.hbase.TableDescriptors;
034import org.apache.hadoop.hbase.wal.WAL.Entry;
035import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
036
037/**
038 * ReplicationEndpoint is a plugin which implements replication
039 * to other HBase clusters, or other systems. ReplicationEndpoint implementation
040 * can be specified at the peer creation time by specifying it
041 * in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread
042 * in each region server in the same process.
043 * <p>
044 * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer
045 * relation. ReplicationSource is an HBase-private class which tails the logs and manages
046 * the queue of logs plus management and persistence of all the state for replication.
047 * ReplicationEndpoint on the other hand is responsible for doing the actual shipping
048 * and persisting of the WAL entries in the other cluster.
049 */
050@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
051public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
052  // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop.
053  // How they relate? Do we #start before #init(Context)? We fail fast if you don't?
054
055  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
056  class Context {
057    private final Server server;
058    private final Configuration localConf;
059    private final Configuration conf;
060    private final FileSystem fs;
061    private final TableDescriptors tableDescriptors;
062    private final ReplicationPeer replicationPeer;
063    private final String peerId;
064    private final UUID clusterId;
065    private final MetricsSource metrics;
066    private final Abortable abortable;
067
068    @InterfaceAudience.Private
069    public Context(final Server server, final Configuration localConf, final Configuration conf,
070        final FileSystem fs, final String peerId, final UUID clusterId,
071        final ReplicationPeer replicationPeer, final MetricsSource metrics,
072        final TableDescriptors tableDescriptors, final Abortable abortable) {
073      this.server = server;
074      this.localConf = localConf;
075      this.conf = conf;
076      this.fs = fs;
077      this.clusterId = clusterId;
078      this.peerId = peerId;
079      this.replicationPeer = replicationPeer;
080      this.metrics = metrics;
081      this.tableDescriptors = tableDescriptors;
082      this.abortable = abortable;
083    }
084
085    public Server getServer() {
086      return server;
087    }
088
089    public Configuration getConfiguration() {
090      return conf;
091    }
092
093    public Configuration getLocalConfiguration() {
094      return localConf;
095    }
096
097    public FileSystem getFilesystem() {
098      return fs;
099    }
100
101    public UUID getClusterId() {
102      return clusterId;
103    }
104
105    public String getPeerId() {
106      return peerId;
107    }
108
109    public ReplicationPeerConfig getPeerConfig() {
110      return replicationPeer.getPeerConfig();
111    }
112
113    public ReplicationPeer getReplicationPeer() {
114      return replicationPeer;
115    }
116
117    public MetricsSource getMetrics() {
118      return metrics;
119    }
120
121    public TableDescriptors getTableDescriptors() {
122      return tableDescriptors;
123    }
124
125    public Abortable getAbortable() {
126      return abortable;
127    }
128  }
129
130  /**
131   * Initialize the replication endpoint with the given context.
132   * @param context replication context
133   * @throws IOException error occur when initialize the endpoint.
134   */
135  void init(Context context) throws IOException;
136
137  /** Whether or not, the replication endpoint can replicate to it's source cluster with the same
138   * UUID */
139  boolean canReplicateToSameCluster();
140
141  /**
142   * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted
143   * associated UUID. If the replication is not performed to an actual HBase cluster (but
144   * some other system), the UUID returned has to uniquely identify the connected target system.
145   * @return a UUID or null if the peer cluster does not exist or is not connected.
146   */
147  UUID getPeerUUID();
148
149  /**
150   * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
151   * infrastructure will call this filter before sending the edits to shipEdits().
152   * @return a {@link WALEntryFilter} or null.
153   */
154  WALEntryFilter getWALEntryfilter();
155
156  /**
157   * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
158   */
159  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
160  static class ReplicateContext {
161    List<Entry> entries;
162    int size;
163    String walGroupId;
164    int timeout;
165    @InterfaceAudience.Private
166    public ReplicateContext() {
167    }
168
169    public ReplicateContext setEntries(List<Entry> entries) {
170      this.entries = entries;
171      return this;
172    }
173    public ReplicateContext setSize(int size) {
174      this.size = size;
175      return this;
176    }
177    public ReplicateContext setWalGroupId(String walGroupId) {
178      this.walGroupId = walGroupId;
179      return this;
180    }
181    public List<Entry> getEntries() {
182      return entries;
183    }
184    public int getSize() {
185      return size;
186    }
187    public String getWalGroupId(){
188      return walGroupId;
189    }
190    public void setTimeout(int timeout) {
191      this.timeout = timeout;
192    }
193    public int getTimeout() {
194      return this.timeout;
195    }
196  }
197
198  /**
199   * Replicate the given set of entries (in the context) to the other cluster.
200   * Can block until all the given entries are replicated. Upon this method is returned,
201   * all entries that were passed in the context are assumed to be persisted in the
202   * target cluster.
203   * @param replicateContext a context where WAL entries and other
204   * parameters can be obtained.
205   */
206  boolean replicate(ReplicateContext replicateContext);
207
208
209  // The below methods are inspired by Guava Service. See
210  // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
211  // Below we implement a subset only with different names on some methods so we can implement
212  // the below internally using Guava (without exposing our implementation to
213  // ReplicationEndpoint implementors.
214
215  /**
216   * Returns {@code true} if this service is RUNNING.
217   */
218  boolean isRunning();
219
220  /**
221   * @return Return {@code true} is this service is STARTING (but not yet RUNNING).
222   */
223  boolean isStarting();
224
225  /**
226   * Initiates service startup and returns immediately. A stopped service may not be restarted.
227   * Equivalent of startAsync call in Guava Service.
228   * @throws IllegalStateException if the service is not new, if it has been run already.
229   */
230  void start();
231
232  /**
233   * Waits for the {@link ReplicationEndpoint} to be up and running.
234   *
235   * @throws IllegalStateException if the service reaches a state from which it is not possible to
236   *     enter the (internal) running state. e.g. if the state is terminated when this method is
237   *     called then this will throw an IllegalStateException.
238   */
239  void awaitRunning();
240
241  /**
242   * Waits for the {@link ReplicationEndpoint} to to be up and running for no more
243   * than the given time.
244   *
245   * @param timeout the maximum time to wait
246   * @param unit the time unit of the timeout argument
247   * @throws TimeoutException if the service has not reached the given state within the deadline
248   * @throws IllegalStateException if the service reaches a state from which it is not possible to
249   *     enter the (internal) running state. e.g. if the state is terminated when this method is
250   *     called then this will throw an IllegalStateException.
251   */
252  void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
253
254  /**
255   * If the service is starting or running, this initiates service shutdown and returns immediately.
256   * If the service has already been stopped, this method returns immediately without taking action.
257   * Equivalent of stopAsync call in Guava Service.
258   */
259  void stop();
260
261  /**
262   * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state.
263   *
264   * @throws IllegalStateException if the service FAILED.
265   */
266  void awaitTerminated();
267
268  /**
269   * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no
270   * more than the given time.
271   *
272   * @param timeout the maximum time to wait
273   * @param unit the time unit of the timeout argument
274   * @throws TimeoutException if the service has not reached the given state within the deadline
275   * @throws IllegalStateException if the service FAILED.
276   */
277  void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
278
279  /**
280   * Returns the {@link Throwable} that caused this service to fail.
281   *
282   * @throws IllegalStateException if this service's state isn't FAILED.
283   */
284  Throwable failureCause();
285}