001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.UUID;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TimeoutException;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.Server;
030import org.apache.hadoop.hbase.TableDescriptors;
031import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
032import org.apache.hadoop.hbase.wal.WAL.Entry;
033import org.apache.yetus.audience.InterfaceAudience;
034
035/**
036 * ReplicationEndpoint is a plugin which implements replication to other HBase clusters, or other
037 * systems. ReplicationEndpoint implementation can be specified at the peer creation time by
038 * specifying it in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread in
039 * each region server in the same process.
040 * <p>
041 * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer relation.
042 * ReplicationSource is an HBase-private class which tails the logs and manages the queue of logs
043 * plus management and persistence of all the state for replication. ReplicationEndpoint on the
044 * other hand is responsible for doing the actual shipping and persisting of the WAL entries in the
045 * other cluster.
046 */
047@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
048public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
049  // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop.
050  // How they relate? Do we #start before #init(Context)? We fail fast if you don't?
051
052  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
053  class Context {
054    private final Server server;
055    private final Configuration localConf;
056    private final Configuration conf;
057    private final FileSystem fs;
058    private final TableDescriptors tableDescriptors;
059    private final ReplicationPeer replicationPeer;
060    private final String peerId;
061    private final UUID clusterId;
062    private final MetricsSource metrics;
063    private final Abortable abortable;
064
065    @InterfaceAudience.Private
066    public Context(final Server server, final Configuration localConf, final Configuration conf,
067      final FileSystem fs, final String peerId, final UUID clusterId,
068      final ReplicationPeer replicationPeer, final MetricsSource metrics,
069      final TableDescriptors tableDescriptors, final Abortable abortable) {
070      this.server = server;
071      this.localConf = localConf;
072      this.conf = conf;
073      this.fs = fs;
074      this.clusterId = clusterId;
075      this.peerId = peerId;
076      this.replicationPeer = replicationPeer;
077      this.metrics = metrics;
078      this.tableDescriptors = tableDescriptors;
079      this.abortable = abortable;
080    }
081
082    public Server getServer() {
083      return server;
084    }
085
086    public Configuration getConfiguration() {
087      return conf;
088    }
089
090    public Configuration getLocalConfiguration() {
091      return localConf;
092    }
093
094    public FileSystem getFilesystem() {
095      return fs;
096    }
097
098    public UUID getClusterId() {
099      return clusterId;
100    }
101
102    public String getPeerId() {
103      return peerId;
104    }
105
106    public ReplicationPeerConfig getPeerConfig() {
107      return replicationPeer.getPeerConfig();
108    }
109
110    public ReplicationPeer getReplicationPeer() {
111      return replicationPeer;
112    }
113
114    public MetricsSource getMetrics() {
115      return metrics;
116    }
117
118    public TableDescriptors getTableDescriptors() {
119      return tableDescriptors;
120    }
121
122    public Abortable getAbortable() {
123      return abortable;
124    }
125  }
126
127  /**
128   * Initialize the replication endpoint with the given context.
129   * @param context replication context
130   * @throws IOException error occur when initialize the endpoint.
131   */
132  void init(Context context) throws IOException;
133
134  /**
135   * Whether or not, the replication endpoint can replicate to it's source cluster with the same
136   * UUID
137   */
138  boolean canReplicateToSameCluster();
139
140  /**
141   * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted associated
142   * UUID. If the replication is not performed to an actual HBase cluster (but some other system),
143   * the UUID returned has to uniquely identify the connected target system.
144   * @return a UUID or null if the peer cluster does not exist or is not connected.
145   */
146  UUID getPeerUUID();
147
148  /**
149   * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
150   * infrastructure will call this filter before sending the edits to shipEdits().
151   * @return a {@link WALEntryFilter} or null.
152   */
153  WALEntryFilter getWALEntryfilter();
154
155  /**
156   * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
157   */
158  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
159  static class ReplicateContext {
160    List<Entry> entries;
161    int size;
162    String walGroupId;
163    int timeout;
164
165    @InterfaceAudience.Private
166    public ReplicateContext() {
167    }
168
169    public ReplicateContext setEntries(List<Entry> entries) {
170      this.entries = entries;
171      return this;
172    }
173
174    public ReplicateContext setSize(int size) {
175      this.size = size;
176      return this;
177    }
178
179    public ReplicateContext setWalGroupId(String walGroupId) {
180      this.walGroupId = walGroupId;
181      return this;
182    }
183
184    public List<Entry> getEntries() {
185      return entries;
186    }
187
188    public int getSize() {
189      return size;
190    }
191
192    public String getWalGroupId() {
193      return walGroupId;
194    }
195
196    public void setTimeout(int timeout) {
197      this.timeout = timeout;
198    }
199
200    public int getTimeout() {
201      return this.timeout;
202    }
203  }
204
205  /**
206   * Replicate the given set of entries (in the context) to the other cluster. Can block until all
207   * the given entries are replicated. Upon this method is returned, all entries that were passed in
208   * the context are assumed to be persisted in the target cluster.
209   * @param replicateContext a context where WAL entries and other parameters can be obtained.
210   */
211  boolean replicate(ReplicateContext replicateContext);
212
213  // The below methods are inspired by Guava Service. See
214  // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
215  // Below we implement a subset only with different names on some methods so we can implement
216  // the below internally using Guava (without exposing our implementation to
217  // ReplicationEndpoint implementors.
218
219  /**
220   * Returns {@code true} if this service is RUNNING.
221   */
222  boolean isRunning();
223
224  /** Returns Return {@code true} is this service is STARTING (but not yet RUNNING). */
225  boolean isStarting();
226
227  /**
228   * Initiates service startup and returns immediately. A stopped service may not be restarted.
229   * Equivalent of startAsync call in Guava Service.
230   * @throws IllegalStateException if the service is not new, if it has been run already.
231   */
232  void start();
233
234  /**
235   * Waits for the {@link ReplicationEndpoint} to be up and running.
236   * @throws IllegalStateException if the service reaches a state from which it is not possible to
237   *                               enter the (internal) running state. e.g. if the state is
238   *                               terminated when this method is called then this will throw an
239   *                               IllegalStateException.
240   */
241  void awaitRunning();
242
243  /**
244   * Waits for the {@link ReplicationEndpoint} to to be up and running for no more than the given
245   * time.
246   * @param timeout the maximum time to wait
247   * @param unit    the time unit of the timeout argument
248   * @throws TimeoutException      if the service has not reached the given state within the
249   *                               deadline
250   * @throws IllegalStateException if the service reaches a state from which it is not possible to
251   *                               enter the (internal) running state. e.g. if the state is
252   *                               terminated when this method is called then this will throw an
253   *                               IllegalStateException.
254   */
255  void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
256
257  /**
258   * If the service is starting or running, this initiates service shutdown and returns immediately.
259   * If the service has already been stopped, this method returns immediately without taking action.
260   * Equivalent of stopAsync call in Guava Service.
261   */
262  void stop();
263
264  /**
265   * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state.
266   * @throws IllegalStateException if the service FAILED.
267   */
268  void awaitTerminated();
269
270  /**
271   * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no more than the given
272   * time.
273   * @param timeout the maximum time to wait
274   * @param unit    the time unit of the timeout argument
275   * @throws TimeoutException      if the service has not reached the given state within the
276   *                               deadline
277   * @throws IllegalStateException if the service FAILED.
278   */
279  void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
280
281  /**
282   * Returns the {@link Throwable} that caused this service to fail.
283   * @throws IllegalStateException if this service's state isn't FAILED.
284   */
285  Throwable failureCause();
286}