001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.TableDescriptors;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
035
036/**
037 * ReplicationEndpoint is a plugin which implements replication
038 * to other HBase clusters, or other systems. ReplicationEndpoint implementation
039 * can be specified at the peer creation time by specifying it
040 * in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread
041 * in each region server in the same process.
042 * <p>
043 * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer
044 * relation. ReplicationSource is an HBase-private class which tails the logs and manages
045 * the queue of logs plus management and persistence of all the state for replication.
046 * ReplicationEndpoint on the other hand is responsible for doing the actual shipping
047 * and persisting of the WAL entries in the other cluster.
048 */
049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
050public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
051  // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop.
052  // How they relate? Do we #start before #init(Context)? We fail fast if you don't?
053
054  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
055  class Context {
056    private final Configuration localConf;
057    private final Configuration conf;
058    private final FileSystem fs;
059    private final TableDescriptors tableDescriptors;
060    private final ReplicationPeer replicationPeer;
061    private final String peerId;
062    private final UUID clusterId;
063    private final MetricsSource metrics;
064    private final Abortable abortable;
065
066    @InterfaceAudience.Private
067    public Context(
068        final Configuration localConf,
069        final Configuration conf,
070        final FileSystem fs,
071        final String peerId,
072        final UUID clusterId,
073        final ReplicationPeer replicationPeer,
074        final MetricsSource metrics,
075        final TableDescriptors tableDescriptors,
076        final Abortable abortable) {
077      this.localConf = localConf;
078      this.conf = conf;
079      this.fs = fs;
080      this.clusterId = clusterId;
081      this.peerId = peerId;
082      this.replicationPeer = replicationPeer;
083      this.metrics = metrics;
084      this.tableDescriptors = tableDescriptors;
085      this.abortable = abortable;
086    }
087    public Configuration getConfiguration() {
088      return conf;
089    }
090    public Configuration getLocalConfiguration() {
091      return localConf;
092    }
093    public FileSystem getFilesystem() {
094      return fs;
095    }
096    public UUID getClusterId() {
097      return clusterId;
098    }
099    public String getPeerId() {
100      return peerId;
101    }
102    public ReplicationPeerConfig getPeerConfig() {
103      return replicationPeer.getPeerConfig();
104    }
105    public ReplicationPeer getReplicationPeer() {
106      return replicationPeer;
107    }
108    public MetricsSource getMetrics() {
109      return metrics;
110    }
111    public TableDescriptors getTableDescriptors() {
112      return tableDescriptors;
113    }
114    public Abortable getAbortable() { return abortable; }
115  }
116
117  /**
118   * Initialize the replication endpoint with the given context.
119   * @param context replication context
120   * @throws IOException
121   */
122  void init(Context context) throws IOException;
123
124  /** Whether or not, the replication endpoint can replicate to it's source cluster with the same
125   * UUID */
126  boolean canReplicateToSameCluster();
127
128  /**
129   * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted
130   * associated UUID. If the replication is not performed to an actual HBase cluster (but
131   * some other system), the UUID returned has to uniquely identify the connected target system.
132   * @return a UUID or null if the peer cluster does not exist or is not connected.
133   */
134  UUID getPeerUUID();
135
136  /**
137   * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
138   * infrastructure will call this filter before sending the edits to shipEdits().
139   * @return a {@link WALEntryFilter} or null.
140   */
141  WALEntryFilter getWALEntryfilter();
142
143  /**
144   * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
145   */
146  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
147  static class ReplicateContext {
148    List<Entry> entries;
149    int size;
150    String walGroupId;
151    @InterfaceAudience.Private
152    public ReplicateContext() {
153    }
154
155    public ReplicateContext setEntries(List<Entry> entries) {
156      this.entries = entries;
157      return this;
158    }
159    public ReplicateContext setSize(int size) {
160      this.size = size;
161      return this;
162    }
163    public ReplicateContext setWalGroupId(String walGroupId) {
164      this.walGroupId = walGroupId;
165      return this;
166    }
167    public List<Entry> getEntries() {
168      return entries;
169    }
170    public int getSize() {
171      return size;
172    }
173    public String getWalGroupId(){
174      return walGroupId;
175    }
176  }
177
178  /**
179   * Replicate the given set of entries (in the context) to the other cluster.
180   * Can block until all the given entries are replicated. Upon this method is returned,
181   * all entries that were passed in the context are assumed to be persisted in the
182   * target cluster.
183   * @param replicateContext a context where WAL entries and other
184   * parameters can be obtained.
185   */
186  boolean replicate(ReplicateContext replicateContext);
187
188
189  // The below methods are inspired by Guava Service. See
190  // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
191  // Below we implement a subset only with different names on some methods so we can implement
192  // the below internally using Guava (without exposing our implementation to
193  // ReplicationEndpoint implementors.
194
195  /**
196   * Returns {@code true} if this service is RUNNING.
197   */
198  boolean isRunning();
199
200  /**
201   * @return Return {@code true} is this service is STARTING (but not yet RUNNING).
202   */
203  boolean isStarting();
204
205  /**
206   * Initiates service startup and returns immediately. A stopped service may not be restarted.
207   * Equivalent of startAsync call in Guava Service.
208   * @throws IllegalStateException if the service is not new, if it has been run already.
209   */
210  void start();
211
212  /**
213   * Waits for the {@link ReplicationEndpoint} to be up and running.
214   *
215   * @throws IllegalStateException if the service reaches a state from which it is not possible to
216   *     enter the (internal) running state. e.g. if the state is terminated when this method is
217   *     called then this will throw an IllegalStateException.
218   */
219  void awaitRunning();
220
221  /**
222   * Waits for the {@link ReplicationEndpoint} to to be up and running for no more
223   * than the given time.
224   *
225   * @param timeout the maximum time to wait
226   * @param unit the time unit of the timeout argument
227   * @throws TimeoutException if the service has not reached the given state within the deadline
228   * @throws IllegalStateException if the service reaches a state from which it is not possible to
229   *     enter the (internal) running state. e.g. if the state is terminated when this method is
230   *     called then this will throw an IllegalStateException.
231   */
232  void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
233
234  /**
235   * If the service is starting or running, this initiates service shutdown and returns immediately.
236   * If the service has already been stopped, this method returns immediately without taking action.
237   * Equivalent of stopAsync call in Guava Service.
238   */
239  void stop();
240
241  /**
242   * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state.
243   *
244   * @throws IllegalStateException if the service FAILED.
245   */
246  void awaitTerminated();
247
248  /**
249   * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no
250   * more than the given time.
251   *
252   * @param timeout the maximum time to wait
253   * @param unit the time unit of the timeout argument
254   * @throws TimeoutException if the service has not reached the given state within the deadline
255   * @throws IllegalStateException if the service FAILED.
256   */
257  void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
258
259  /**
260   * Returns the {@link Throwable} that caused this service to fail.
261   *
262   * @throws IllegalStateException if this service's state isn't FAILED.
263   */
264  Throwable failureCause();
265}