View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.net.ConnectException;
23  import java.net.SocketTimeoutException;
24  import java.util.List;
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HBaseConfiguration;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.TableNotFoundException;
33  import org.apache.hadoop.hbase.client.HConnection;
34  import org.apache.hadoop.hbase.client.HConnectionManager;
35  import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
36  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
37  import org.apache.hadoop.hbase.wal.WAL.Entry;
38  import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
39  import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
40  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
41  import org.apache.hadoop.ipc.RemoteException;
42  
43  /**
44   * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 
45   * implementation for replicating to another HBase cluster.
46   * For the slave cluster it selects a random number of peers
47   * using a replication ratio. For example, if replication ration = 0.1
48   * and slave cluster has 100 region servers, 10 will be selected.
49   * <p/>
50   * A stream is considered down when we cannot contact a region server on the
51   * peer cluster for more than 55 seconds by default.
52   */
53  @InterfaceAudience.Private
54  public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
55  
56    private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
57    private HConnection conn;
58  
59    private Configuration conf;
60  
61    // How long should we sleep for each retry
62    private long sleepForRetries;
63  
64    // Maximum number of retries before taking bold actions
65    private int maxRetriesMultiplier;
66    // Socket timeouts require even bolder actions since we don't want to DDOS
67    private int socketTimeoutMultiplier;
68    //Metrics for this source
69    private MetricsSource metrics;
70    // Handles connecting to peer region servers
71    private ReplicationSinkManager replicationSinkMgr;
72    private boolean peersSelected = false;
73  
74    @Override
75    public void init(Context context) throws IOException {
76      super.init(context);
77      this.conf = HBaseConfiguration.create(ctx.getConfiguration());
78      decorateConf();
79      this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
80      this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
81          maxRetriesMultiplier);
82      // TODO: This connection is replication specific or we should make it particular to
83      // replication and make replication specific settings such as compression or codec to use
84      // passing Cells.
85      this.conn = HConnectionManager.createConnection(this.conf);
86      this.sleepForRetries =
87          this.conf.getLong("replication.source.sleepforretries", 1000);
88      this.metrics = context.getMetrics();
89      // ReplicationQueueInfo parses the peerId out of the znode for us
90      this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
91    }
92  
93    private void decorateConf() {
94      String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
95      if (StringUtils.isNotEmpty(replicationCodec)) {
96        this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
97      }
98    }
99  
100   private void connectToPeers() {
101     getRegionServers();
102 
103     int sleepMultiplier = 1;
104 
105     // Connect to peer cluster first, unless we have to stop
106     while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
107       replicationSinkMgr.chooseSinks();
108       if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
109         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
110           sleepMultiplier++;
111         }
112       }
113     }
114   }
115 
116   /**
117    * Do the sleeping logic
118    * @param msg Why we sleep
119    * @param sleepMultiplier by how many times the default sleeping time is augmented
120    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
121    */
122   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
123     try {
124       if (LOG.isTraceEnabled()) {
125         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
126       }
127       Thread.sleep(this.sleepForRetries * sleepMultiplier);
128     } catch (InterruptedException e) {
129       LOG.debug("Interrupted while sleeping between retries");
130     }
131     return sleepMultiplier < maxRetriesMultiplier;
132   }
133 
134   /**
135    * Do the shipping logic
136    */
137   @Override
138   public boolean replicate(ReplicateContext replicateContext) {
139     List<Entry> entries = replicateContext.getEntries();
140     int sleepMultiplier = 1;
141     while (this.isRunning()) {
142       if (!peersSelected) {
143         connectToPeers();
144         peersSelected = true;
145       }
146 
147       if (!isPeerEnabled()) {
148         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
149           sleepMultiplier++;
150         }
151         continue;
152       }
153       SinkPeer sinkPeer = null;
154       try {
155         sinkPeer = replicationSinkMgr.getReplicationSink();
156         BlockingInterface rrs = sinkPeer.getRegionServer();
157         if (LOG.isTraceEnabled()) {
158           LOG.trace("Replicating " + entries.size() +
159               " entries of total size " + replicateContext.getSize());
160         }
161         ReplicationProtbufUtil.replicateWALEntry(rrs,
162             entries.toArray(new Entry[entries.size()]));
163 
164         // update metrics
165         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
166         replicationSinkMgr.reportSinkSuccess(sinkPeer);
167         return true;
168 
169       } catch (IOException ioe) {
170         // Didn't ship anything, but must still age the last time we did
171         this.metrics.refreshAgeOfLastShippedOp();
172         if (ioe instanceof RemoteException) {
173           ioe = ((RemoteException) ioe).unwrapRemoteException();
174           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
175           if (ioe instanceof TableNotFoundException) {
176             if (sleepForRetries("A table is missing in the peer cluster. "
177                 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
178               sleepMultiplier++;
179             }
180           }
181         } else {
182           if (ioe instanceof SocketTimeoutException) {
183             // This exception means we waited for more than 60s and nothing
184             // happened, the cluster is alive and calling it right away
185             // even for a test just makes things worse.
186             sleepForRetries("Encountered a SocketTimeoutException. Since the " +
187               "call to the remote cluster timed out, which is usually " +
188               "caused by a machine failure or a massive slowdown",
189               this.socketTimeoutMultiplier);
190           } else if (ioe instanceof ConnectException) {
191             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
192             replicationSinkMgr.chooseSinks();
193           } else {
194             LOG.warn("Can't replicate because of a local or network error: ", ioe);
195           }
196         }
197 
198         if (sinkPeer != null) {
199           replicationSinkMgr.reportBadSink(sinkPeer);
200         }
201         if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
202           sleepMultiplier++;
203         }
204       }
205     }
206     return false; // in case we exited before replicating
207   }
208 
209   protected boolean isPeerEnabled() {
210     return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
211   }
212 
213   @Override
214   protected void doStop() {
215     disconnect(); //don't call super.doStop()
216     if (this.conn != null) {
217       try {
218         this.conn.close();
219         this.conn = null;
220       } catch (IOException e) {
221         LOG.warn("Failed to close the connection");
222       }
223     }
224     notifyStopped();
225   }
226 }