001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.net.InetAddress;
023import java.net.Socket;
024import java.nio.ByteBuffer;
025import java.nio.channels.ReadableByteChannel;
026import java.nio.channels.SocketChannel;
027import java.util.concurrent.ConcurrentLinkedDeque;
028import java.util.concurrent.atomic.LongAdder;
029import java.util.concurrent.locks.Lock;
030import java.util.concurrent.locks.ReentrantLock;
031
032import org.apache.hadoop.hbase.CellScanner;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.apache.hadoop.hbase.client.VersionInfoUtil;
036import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
037import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
038import org.apache.hadoop.hbase.nio.ByteBuff;
039import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
040import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
041import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
042import org.apache.hbase.thirdparty.com.google.protobuf.Message;
043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
045
046/** Reads calls from a connection and queues them for handling. */
047@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
048    justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
049@InterfaceAudience.Private
050class SimpleServerRpcConnection extends ServerRpcConnection {
051
052  final SocketChannel channel;
053  private ByteBuff data;
054  private ByteBuffer dataLengthBuffer;
055  private ByteBuffer preambleBuffer;
056  private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
057  private long lastContact;
058  private final Socket socket;
059  final SimpleRpcServerResponder responder;
060
061  // If initial preamble with version and magic has been read or not.
062  private boolean connectionPreambleRead = false;
063
064  final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>();
065  final Lock responseWriteLock = new ReentrantLock();
066  long lastSentTime = -1L;
067
068  public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel,
069      long lastContact) {
070    super(rpcServer);
071    this.channel = channel;
072    this.lastContact = lastContact;
073    this.data = null;
074    this.dataLengthBuffer = ByteBuffer.allocate(4);
075    this.socket = channel.socket();
076    this.addr = socket.getInetAddress();
077    if (addr == null) {
078      this.hostAddress = "*Unknown*";
079    } else {
080      this.hostAddress = addr.getHostAddress();
081    }
082    this.remotePort = socket.getPort();
083    if (rpcServer.socketSendBufferSize != 0) {
084      try {
085        socket.setSendBufferSize(rpcServer.socketSendBufferSize);
086      } catch (IOException e) {
087        SimpleRpcServer.LOG.warn(
088          "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize);
089      }
090    }
091    this.responder = rpcServer.responder;
092  }
093
094  public void setLastContact(long lastContact) {
095    this.lastContact = lastContact;
096  }
097
098  public long getLastContact() {
099    return lastContact;
100  }
101
102  /* Return true if the connection has no outstanding rpc */
103  boolean isIdle() {
104    return rpcCount.sum() == 0;
105  }
106
107  /* Decrement the outstanding RPC count */
108  protected void decRpcCount() {
109    rpcCount.decrement();
110  }
111
112  /* Increment the outstanding RPC count */
113  protected void incRpcCount() {
114    rpcCount.increment();
115  }
116
117  private int readPreamble() throws IOException {
118    if (preambleBuffer == null) {
119      preambleBuffer = ByteBuffer.allocate(6);
120    }
121    int count = this.rpcServer.channelRead(channel, preambleBuffer);
122    if (count < 0 || preambleBuffer.remaining() > 0) {
123      return count;
124    }
125    preambleBuffer.flip();
126    if (!processPreamble(preambleBuffer)) {
127      return -1;
128    }
129    preambleBuffer = null; // do not need it anymore
130    connectionPreambleRead = true;
131    return count;
132  }
133
134  private int read4Bytes() throws IOException {
135    if (this.dataLengthBuffer.remaining() > 0) {
136      return this.rpcServer.channelRead(channel, this.dataLengthBuffer);
137    } else {
138      return 0;
139    }
140  }
141
142  /**
143   * Read off the wire. If there is not enough data to read, update the connection state with what
144   * we have and returns.
145   * @return Returns -1 if failure (and caller will close connection), else zero or more.
146   * @throws IOException
147   * @throws InterruptedException
148   */
149  public int readAndProcess() throws IOException, InterruptedException {
150    // If we have not read the connection setup preamble, look to see if that is on the wire.
151    if (!connectionPreambleRead) {
152      int count = readPreamble();
153      if (!connectionPreambleRead) {
154        return count;
155      }
156    }
157
158    // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the
159    // integer length into the 4-byte this.dataLengthBuffer.
160    int count = read4Bytes();
161    if (count < 0 || dataLengthBuffer.remaining() > 0) {
162      return count;
163    }
164
165    // We have read a length and we have read the preamble. It is either the connection header
166    // or it is a request.
167    if (data == null) {
168      dataLengthBuffer.flip();
169      int dataLength = dataLengthBuffer.getInt();
170      if (dataLength == RpcClient.PING_CALL_ID) {
171        if (!useWrap) { // covers the !useSasl too
172          dataLengthBuffer.clear();
173          return 0; // ping message
174        }
175      }
176      if (dataLength < 0) { // A data length of zero is legal.
177        throw new DoNotRetryIOException(
178            "Unexpected data length " + dataLength + "!! from " + getHostAddress());
179      }
180
181      if (dataLength > this.rpcServer.maxRequestSize) {
182        String msg = "RPC data length of " + dataLength + " received from " + getHostAddress() +
183            " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \"" +
184            SimpleRpcServer.MAX_REQUEST_SIZE +
185            "\" on server to override this limit (not recommended)";
186        SimpleRpcServer.LOG.warn(msg);
187
188        if (connectionHeaderRead && connectionPreambleRead) {
189          incRpcCount();
190          // Construct InputStream for the non-blocking SocketChannel
191          // We need the InputStream because we want to read only the request header
192          // instead of the whole rpc.
193          ByteBuffer buf = ByteBuffer.allocate(1);
194          InputStream is = new InputStream() {
195            @Override
196            public int read() throws IOException {
197              SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf);
198              buf.flip();
199              int x = buf.get();
200              buf.flip();
201              return x;
202            }
203          };
204          CodedInputStream cis = CodedInputStream.newInstance(is);
205          int headerSize = cis.readRawVarint32();
206          Message.Builder builder = RequestHeader.newBuilder();
207          ProtobufUtil.mergeFrom(builder, cis, headerSize);
208          RequestHeader header = (RequestHeader) builder.build();
209
210          // Notify the client about the offending request
211          SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
212              null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0,
213              this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder);
214          this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
215          // Make sure the client recognizes the underlying exception
216          // Otherwise, throw a DoNotRetryIOException.
217          if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
218            RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
219            reqTooBig.setResponse(null, null, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, msg);
220          } else {
221            reqTooBig.setResponse(null, null, new DoNotRetryIOException(), msg);
222          }
223          // In most cases we will write out the response directly. If not, it is still OK to just
224          // close the connection without writing out the reqTooBig response. Do not try to write
225          // out directly here, and it will cause deserialization error if the connection is slow
226          // and we have a half writing response in the queue.
227          reqTooBig.sendResponseIfReady();
228        }
229        // Close the connection
230        return -1;
231      }
232
233      // Initialize this.data with a ByteBuff.
234      // This call will allocate a ByteBuff to read request into and assign to this.data
235      // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
236      // assign to this.callCleanup
237      initByteBuffToReadInto(dataLength);
238
239      // Increment the rpc count. This counter will be decreased when we write
240      // the response. If we want the connection to be detected as idle properly, we
241      // need to keep the inc / dec correct.
242      incRpcCount();
243    }
244
245    count = channelDataRead(channel, data);
246
247    if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
248      process();
249    }
250
251    return count;
252  }
253
254  // It creates the ByteBuff and CallCleanup and assign to Connection instance.
255  private void initByteBuffToReadInto(int length) {
256    this.data = rpcServer.bbAllocator.allocate(length);
257    this.callCleanup = data::release;
258  }
259
260  protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
261    int count = buf.read(channel);
262    if (count > 0) {
263      this.rpcServer.metrics.receivedBytes(count);
264    }
265    return count;
266  }
267
268  /**
269   * Process the data buffer and clean the connection state for the next call.
270   */
271  private void process() throws IOException, InterruptedException {
272    data.rewind();
273    try {
274      if (skipInitialSaslHandshake) {
275        skipInitialSaslHandshake = false;
276        return;
277      }
278
279      if (useSasl) {
280        saslReadAndProcess(data);
281      } else {
282        processOneRpc(data);
283      }
284
285    } finally {
286      dataLengthBuffer.clear(); // Clean for the next call
287      data = null; // For the GC
288      this.callCleanup = null;
289    }
290  }
291
292  @Override
293  public synchronized void close() {
294    disposeSasl();
295    data = null;
296    callCleanup = null;
297    if (!channel.isOpen()) return;
298    try {
299      socket.shutdownOutput();
300    } catch (Exception ignored) {
301      if (SimpleRpcServer.LOG.isTraceEnabled()) {
302        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
303      }
304    }
305    if (channel.isOpen()) {
306      try {
307        channel.close();
308      } catch (Exception ignored) {
309      }
310    }
311    try {
312      socket.close();
313    } catch (Exception ignored) {
314      if (SimpleRpcServer.LOG.isTraceEnabled()) {
315        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
316      }
317    }
318  }
319
320  @Override
321  public boolean isConnectionOpen() {
322    return channel.isOpen();
323  }
324
325  @Override
326  public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md,
327      RequestHeader header, Message param, CellScanner cellScanner, long size,
328      InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
329    return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
330        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
331        this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
332  }
333
334  @Override
335  protected void doRespond(RpcResponse resp) throws IOException {
336    responder.doRespond(this, resp);
337  }
338}