001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
023import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
024
025import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
026import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
027import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
028
029import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
030import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
031import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
032import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
033import org.apache.hbase.thirdparty.io.netty.channel.Channel;
034import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
035import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
036import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
037import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
038import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
039import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
040import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
041import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
042import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
043import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
044import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
045
046import java.io.IOException;
047import java.util.concurrent.Executors;
048import java.util.concurrent.ScheduledExecutorService;
049import java.util.concurrent.ThreadLocalRandom;
050import java.util.concurrent.TimeUnit;
051
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
056import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
058import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
059import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
060import org.apache.hadoop.hbase.util.Threads;
061import org.apache.hadoop.security.UserGroupInformation;
062
063/**
064 * RPC connection implementation based on netty.
065 * <p>
066 * Most operations are executed in handlers. Netty handler is always executed in the same
067 * thread(EventLoop) so no lock is needed.
068 * @since 2.0.0
069 */
070@InterfaceAudience.Private
071class NettyRpcConnection extends RpcConnection {
072
073  private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
074
075  private static final ScheduledExecutorService RELOGIN_EXECUTOR =
076      Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
077
078  private final NettyRpcClient rpcClient;
079
080  private ByteBuf connectionHeaderPreamble;
081
082  private ByteBuf connectionHeaderWithLength;
083
084  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
085      justification = "connect is also under lock as notifyOnCancel will call our action directly")
086  private Channel channel;
087
088  NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
089    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
090        rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
091    this.rpcClient = rpcClient;
092    byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
093    this.connectionHeaderPreamble =
094        Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
095    ConnectionHeader header = getConnectionHeader();
096    this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
097    this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
098    header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
099  }
100
101  @Override
102  protected synchronized void callTimeout(Call call) {
103    if (channel != null) {
104      channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
105    }
106  }
107
108  @Override
109  public synchronized boolean isActive() {
110    return channel != null;
111  }
112
113  private void shutdown0() {
114    if (channel != null) {
115      channel.close();
116      channel = null;
117    }
118  }
119
120  @Override
121  public synchronized void shutdown() {
122    shutdown0();
123  }
124
125  @Override
126  public synchronized void cleanupConnection() {
127    if (connectionHeaderPreamble != null) {
128      ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
129    }
130    if (connectionHeaderWithLength != null) {
131      ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
132    }
133  }
134
135  private void established(Channel ch) throws IOException {
136    ChannelPipeline p = ch.pipeline();
137    String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
138    p.addBefore(addBeforeHandler, null,
139      new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
140    p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
141    p.addBefore(addBeforeHandler, null,
142      new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
143    p.fireUserEventTriggered(BufferCallEvent.success());
144  }
145
146  private boolean reloginInProgress;
147
148  private void scheduleRelogin(Throwable error) {
149    if (error instanceof FallbackDisallowedException) {
150      return;
151    }
152    synchronized (this) {
153      if (reloginInProgress) {
154        return;
155      }
156      reloginInProgress = true;
157      RELOGIN_EXECUTOR.schedule(new Runnable() {
158
159        @Override
160        public void run() {
161          try {
162            if (shouldAuthenticateOverKrb()) {
163              relogin();
164            }
165          } catch (IOException e) {
166            LOG.warn("Relogin failed", e);
167          }
168          synchronized (this) {
169            reloginInProgress = false;
170          }
171        }
172      }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
173    }
174  }
175
176  private void failInit(Channel ch, IOException e) {
177    synchronized (this) {
178      // fail all pending calls
179      ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
180      shutdown0();
181      return;
182    }
183  }
184
185  private void saslNegotiate(final Channel ch) {
186    UserGroupInformation ticket = getUGI();
187    if (ticket == null) {
188      failInit(ch, new FatalConnectionException("ticket/user is null"));
189      return;
190    }
191    Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
192    final NettyHBaseSaslRpcClientHandler saslHandler;
193    try {
194      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token,
195          serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf);
196    } catch (IOException e) {
197      failInit(ch, e);
198      return;
199    }
200    ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
201    saslPromise.addListener(new FutureListener<Boolean>() {
202
203      @Override
204      public void operationComplete(Future<Boolean> future) throws Exception {
205        if (future.isSuccess()) {
206          ChannelPipeline p = ch.pipeline();
207          p.remove(SaslChallengeDecoder.class);
208          p.remove(NettyHBaseSaslRpcClientHandler.class);
209
210          // check if negotiate with server for connection header is necessary
211          if (saslHandler.isNeedProcessConnectionHeader()) {
212            Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
213            // create the handler to handle the connection header
214            ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
215                connectionHeaderPromise, conf, connectionHeaderWithLength);
216
217            // add ReadTimeoutHandler to deal with server doesn't response connection header
218            // because of the different configuration in client side and server side
219            p.addFirst(
220              new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
221            p.addLast(chHandler);
222            connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
223              @Override
224              public void operationComplete(Future<Boolean> future) throws Exception {
225                if (future.isSuccess()) {
226                  ChannelPipeline p = ch.pipeline();
227                  p.remove(ReadTimeoutHandler.class);
228                  p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
229                  // don't send connection header, NettyHbaseRpcConnectionHeaderHandler
230                  // sent it already
231                  established(ch);
232                } else {
233                  final Throwable error = future.cause();
234                  scheduleRelogin(error);
235                  failInit(ch, toIOE(error));
236                }
237              }
238            });
239          } else {
240            // send the connection header to server
241            ch.write(connectionHeaderWithLength.retainedDuplicate());
242            established(ch);
243          }
244        } else {
245          final Throwable error = future.cause();
246          scheduleRelogin(error);
247          failInit(ch, toIOE(error));
248        }
249      }
250    });
251  }
252
253  private void connect() {
254    LOG.trace("Connecting to {}", remoteId.address);
255
256    this.channel = new Bootstrap().group(rpcClient.group).channel(rpcClient.channelClass)
257        .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
258        .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
259        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
260        .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
261        .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
262
263          @Override
264          public void operationComplete(ChannelFuture future) throws Exception {
265            Channel ch = future.channel();
266            if (!future.isSuccess()) {
267              failInit(ch, toIOE(future.cause()));
268              rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
269              return;
270            }
271            ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
272            if (useSasl) {
273              saslNegotiate(ch);
274            } else {
275              // send the connection header to server
276              ch.write(connectionHeaderWithLength.retainedDuplicate());
277              established(ch);
278            }
279          }
280        }).channel();
281  }
282
283  private void write(Channel ch, final Call call) {
284    ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
285
286      @Override
287      public void operationComplete(ChannelFuture future) throws Exception {
288        // Fail the call if we failed to write it out. This usually because the channel is
289        // closed. This is needed because we may shutdown the channel inside event loop and
290        // there may still be some pending calls in the event loop queue after us.
291        if (!future.isSuccess()) {
292          call.setException(toIOE(future.cause()));
293        }
294      }
295    });
296  }
297
298  @Override
299  public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException {
300    if (reloginInProgress) {
301      throw new IOException("Can not send request because relogin is in progress.");
302    }
303    hrc.notifyOnCancel(new RpcCallback<Object>() {
304
305      @Override
306      public void run(Object parameter) {
307        setCancelled(call);
308        synchronized (this) {
309          if (channel != null) {
310            channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
311          }
312        }
313      }
314    }, new CancellationCallback() {
315
316      @Override
317      public void run(boolean cancelled) throws IOException {
318        if (cancelled) {
319          setCancelled(call);
320        } else {
321          if (channel == null) {
322            connect();
323          }
324          scheduleTimeoutTask(call);
325          final Channel ch = channel;
326          // We must move the whole writeAndFlush call inside event loop otherwise there will be a
327          // race condition.
328          // In netty's DefaultChannelPipeline, it will find the first outbound handler in the
329          // current thread and then schedule a task to event loop which will start the process from
330          // that outbound handler. It is possible that the first handler is
331          // BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set
332          // up at the same time so in the event loop thread we remove the
333          // BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the
334          // write method of BufferCallBeforeInitHandler.
335          // This may be considered as a bug of netty, but anyway there is a work around so let's
336          // fix it by ourselves first.
337          if (ch.eventLoop().inEventLoop()) {
338            write(ch, call);
339          } else {
340            ch.eventLoop().execute(new Runnable() {
341
342              @Override
343              public void run() {
344                write(ch, call);
345              }
346            });
347          }
348        }
349      }
350    });
351  }
352}