1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.net.ConnectException;
23 import java.net.SocketTimeoutException;
24 import java.util.List;
25 import org.apache.commons.lang.StringUtils;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.TableNotFoundException;
33 import org.apache.hadoop.hbase.client.HConnection;
34 import org.apache.hadoop.hbase.client.HConnectionManager;
35 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
37 import org.apache.hadoop.hbase.wal.WAL.Entry;
38 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
39 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
40 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
41 import org.apache.hadoop.ipc.RemoteException;
42
43
44
45
46
47
48
49
50
51
52
53 @InterfaceAudience.Private
54 public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
55
56 private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
57 private HConnection conn;
58
59 private Configuration conf;
60
61
62 private long sleepForRetries;
63
64
65 private int maxRetriesMultiplier;
66
67 private int socketTimeoutMultiplier;
68
69 private MetricsSource metrics;
70
71 private ReplicationSinkManager replicationSinkMgr;
72 private boolean peersSelected = false;
73
74 @Override
75 public void init(Context context) throws IOException {
76 super.init(context);
77 this.conf = HBaseConfiguration.create(ctx.getConfiguration());
78 decorateConf();
79 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
80 this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
81 maxRetriesMultiplier);
82
83
84
85 this.conn = HConnectionManager.createConnection(this.conf);
86 this.sleepForRetries =
87 this.conf.getLong("replication.source.sleepforretries", 1000);
88 this.metrics = context.getMetrics();
89
90 this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
91 }
92
93 private void decorateConf() {
94 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
95 if (StringUtils.isNotEmpty(replicationCodec)) {
96 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
97 }
98 }
99
100 private void connectToPeers() {
101 getRegionServers();
102
103 int sleepMultiplier = 1;
104
105
106 while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
107 replicationSinkMgr.chooseSinks();
108 if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
109 if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
110 sleepMultiplier++;
111 }
112 }
113 }
114 }
115
116
117
118
119
120
121
122 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
123 try {
124 if (LOG.isTraceEnabled()) {
125 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
126 }
127 Thread.sleep(this.sleepForRetries * sleepMultiplier);
128 } catch (InterruptedException e) {
129 LOG.debug("Interrupted while sleeping between retries");
130 }
131 return sleepMultiplier < maxRetriesMultiplier;
132 }
133
134
135
136
137 @Override
138 public boolean replicate(ReplicateContext replicateContext) {
139 List<Entry> entries = replicateContext.getEntries();
140 int sleepMultiplier = 1;
141 while (this.isRunning()) {
142 if (!peersSelected) {
143 connectToPeers();
144 peersSelected = true;
145 }
146
147 if (!isPeerEnabled()) {
148 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
149 sleepMultiplier++;
150 }
151 continue;
152 }
153 SinkPeer sinkPeer = null;
154 try {
155 sinkPeer = replicationSinkMgr.getReplicationSink();
156 BlockingInterface rrs = sinkPeer.getRegionServer();
157 if (LOG.isTraceEnabled()) {
158 LOG.trace("Replicating " + entries.size() +
159 " entries of total size " + replicateContext.getSize());
160 }
161 ReplicationProtbufUtil.replicateWALEntry(rrs,
162 entries.toArray(new Entry[entries.size()]));
163
164
165 this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
166 replicationSinkMgr.reportSinkSuccess(sinkPeer);
167 return true;
168
169 } catch (IOException ioe) {
170
171 this.metrics.refreshAgeOfLastShippedOp();
172 if (ioe instanceof RemoteException) {
173 ioe = ((RemoteException) ioe).unwrapRemoteException();
174 LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
175 if (ioe instanceof TableNotFoundException) {
176 if (sleepForRetries("A table is missing in the peer cluster. "
177 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
178 sleepMultiplier++;
179 }
180 }
181 } else {
182 if (ioe instanceof SocketTimeoutException) {
183
184
185
186 sleepForRetries("Encountered a SocketTimeoutException. Since the " +
187 "call to the remote cluster timed out, which is usually " +
188 "caused by a machine failure or a massive slowdown",
189 this.socketTimeoutMultiplier);
190 } else if (ioe instanceof ConnectException) {
191 LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
192 replicationSinkMgr.chooseSinks();
193 } else {
194 LOG.warn("Can't replicate because of a local or network error: ", ioe);
195 }
196 }
197
198 if (sinkPeer != null) {
199 replicationSinkMgr.reportBadSink(sinkPeer);
200 }
201 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
202 sleepMultiplier++;
203 }
204 }
205 }
206 return false;
207 }
208
209 protected boolean isPeerEnabled() {
210 return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
211 }
212
213 @Override
214 protected void doStop() {
215 disconnect();
216 if (this.conn != null) {
217 try {
218 this.conn.close();
219 this.conn = null;
220 } catch (IOException e) {
221 LOG.warn("Failed to close the connection");
222 }
223 }
224 notifyStopped();
225 }
226 }