001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.net.InetAddress;
023import java.net.Socket;
024import java.nio.ByteBuffer;
025import java.nio.channels.ReadableByteChannel;
026import java.nio.channels.SocketChannel;
027import java.util.concurrent.ConcurrentLinkedDeque;
028import java.util.concurrent.atomic.LongAdder;
029import java.util.concurrent.locks.Lock;
030import java.util.concurrent.locks.ReentrantLock;
031
032import org.apache.hadoop.hbase.CellScanner;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.apache.hadoop.hbase.client.VersionInfoUtil;
036import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
037import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
038import org.apache.hadoop.hbase.nio.ByteBuff;
039import org.apache.hadoop.hbase.nio.SingleByteBuff;
040import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
041import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
042import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
043import org.apache.hbase.thirdparty.com.google.protobuf.Message;
044import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
046import org.apache.hadoop.hbase.util.Pair;
047
048/** Reads calls from a connection and queues them for handling. */
049@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
050    justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
051@InterfaceAudience.Private
052class SimpleServerRpcConnection extends ServerRpcConnection {
053
054  final SocketChannel channel;
055  private ByteBuff data;
056  private ByteBuffer dataLengthBuffer;
057  private ByteBuffer preambleBuffer;
058  private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
059  private long lastContact;
060  private final Socket socket;
061  final SimpleRpcServerResponder responder;
062
063  // If initial preamble with version and magic has been read or not.
064  private boolean connectionPreambleRead = false;
065
066  final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>();
067  final Lock responseWriteLock = new ReentrantLock();
068  long lastSentTime = -1L;
069
070  public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel,
071      long lastContact) {
072    super(rpcServer);
073    this.channel = channel;
074    this.lastContact = lastContact;
075    this.data = null;
076    this.dataLengthBuffer = ByteBuffer.allocate(4);
077    this.socket = channel.socket();
078    this.addr = socket.getInetAddress();
079    if (addr == null) {
080      this.hostAddress = "*Unknown*";
081    } else {
082      this.hostAddress = addr.getHostAddress();
083    }
084    this.remotePort = socket.getPort();
085    if (rpcServer.socketSendBufferSize != 0) {
086      try {
087        socket.setSendBufferSize(rpcServer.socketSendBufferSize);
088      } catch (IOException e) {
089        SimpleRpcServer.LOG.warn(
090          "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize);
091      }
092    }
093    this.responder = rpcServer.responder;
094  }
095
096  public void setLastContact(long lastContact) {
097    this.lastContact = lastContact;
098  }
099
100  public long getLastContact() {
101    return lastContact;
102  }
103
104  /* Return true if the connection has no outstanding rpc */
105  boolean isIdle() {
106    return rpcCount.sum() == 0;
107  }
108
109  /* Decrement the outstanding RPC count */
110  protected void decRpcCount() {
111    rpcCount.decrement();
112  }
113
114  /* Increment the outstanding RPC count */
115  protected void incRpcCount() {
116    rpcCount.increment();
117  }
118
119  private int readPreamble() throws IOException {
120    if (preambleBuffer == null) {
121      preambleBuffer = ByteBuffer.allocate(6);
122    }
123    int count = this.rpcServer.channelRead(channel, preambleBuffer);
124    if (count < 0 || preambleBuffer.remaining() > 0) {
125      return count;
126    }
127    preambleBuffer.flip();
128    if (!processPreamble(preambleBuffer)) {
129      return -1;
130    }
131    preambleBuffer = null; // do not need it anymore
132    connectionPreambleRead = true;
133    return count;
134  }
135
136  private int read4Bytes() throws IOException {
137    if (this.dataLengthBuffer.remaining() > 0) {
138      return this.rpcServer.channelRead(channel, this.dataLengthBuffer);
139    } else {
140      return 0;
141    }
142  }
143
144  /**
145   * Read off the wire. If there is not enough data to read, update the connection state with what
146   * we have and returns.
147   * @return Returns -1 if failure (and caller will close connection), else zero or more.
148   * @throws IOException
149   * @throws InterruptedException
150   */
151  public int readAndProcess() throws IOException, InterruptedException {
152    // If we have not read the connection setup preamble, look to see if that is on the wire.
153    if (!connectionPreambleRead) {
154      int count = readPreamble();
155      if (!connectionPreambleRead) {
156        return count;
157      }
158    }
159
160    // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the
161    // integer length into the 4-byte this.dataLengthBuffer.
162    int count = read4Bytes();
163    if (count < 0 || dataLengthBuffer.remaining() > 0) {
164      return count;
165    }
166
167    // We have read a length and we have read the preamble. It is either the connection header
168    // or it is a request.
169    if (data == null) {
170      dataLengthBuffer.flip();
171      int dataLength = dataLengthBuffer.getInt();
172      if (dataLength == RpcClient.PING_CALL_ID) {
173        if (!useWrap) { // covers the !useSasl too
174          dataLengthBuffer.clear();
175          return 0; // ping message
176        }
177      }
178      if (dataLength < 0) { // A data length of zero is legal.
179        throw new DoNotRetryIOException(
180            "Unexpected data length " + dataLength + "!! from " + getHostAddress());
181      }
182
183      if (dataLength > this.rpcServer.maxRequestSize) {
184        String msg = "RPC data length of " + dataLength + " received from " + getHostAddress() +
185            " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \"" +
186            SimpleRpcServer.MAX_REQUEST_SIZE +
187            "\" on server to override this limit (not recommended)";
188        SimpleRpcServer.LOG.warn(msg);
189
190        if (connectionHeaderRead && connectionPreambleRead) {
191          incRpcCount();
192          // Construct InputStream for the non-blocking SocketChannel
193          // We need the InputStream because we want to read only the request header
194          // instead of the whole rpc.
195          ByteBuffer buf = ByteBuffer.allocate(1);
196          InputStream is = new InputStream() {
197            @Override
198            public int read() throws IOException {
199              SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf);
200              buf.flip();
201              int x = buf.get();
202              buf.flip();
203              return x;
204            }
205          };
206          CodedInputStream cis = CodedInputStream.newInstance(is);
207          int headerSize = cis.readRawVarint32();
208          Message.Builder builder = RequestHeader.newBuilder();
209          ProtobufUtil.mergeFrom(builder, cis, headerSize);
210          RequestHeader header = (RequestHeader) builder.build();
211
212          // Notify the client about the offending request
213          SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
214              null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0,
215              this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder);
216          this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
217          // Make sure the client recognizes the underlying exception
218          // Otherwise, throw a DoNotRetryIOException.
219          if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
220            RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
221            reqTooBig.setResponse(null, null, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, msg);
222          } else {
223            reqTooBig.setResponse(null, null, new DoNotRetryIOException(), msg);
224          }
225          // In most cases we will write out the response directly. If not, it is still OK to just
226          // close the connection without writing out the reqTooBig response. Do not try to write
227          // out directly here, and it will cause deserialization error if the connection is slow
228          // and we have a half writing response in the queue.
229          reqTooBig.sendResponseIfReady();
230        }
231        // Close the connection
232        return -1;
233      }
234
235      // Initialize this.data with a ByteBuff.
236      // This call will allocate a ByteBuff to read request into and assign to this.data
237      // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
238      // assign to this.callCleanup
239      initByteBuffToReadInto(dataLength);
240
241      // Increment the rpc count. This counter will be decreased when we write
242      // the response. If we want the connection to be detected as idle properly, we
243      // need to keep the inc / dec correct.
244      incRpcCount();
245    }
246
247    count = channelDataRead(channel, data);
248
249    if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
250      process();
251    }
252
253    return count;
254  }
255
256  // It creates the ByteBuff and CallCleanup and assign to Connection instance.
257  private void initByteBuffToReadInto(int length) {
258    // We create random on heap buffers are read into those when
259    // 1. ByteBufferPool is not there.
260    // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
261    // waste then. Also if all the reqs are of this size, we will be creating larger sized
262    // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
263    // RegionOpen.
264    // 3. If it is an initial handshake signal or initial connection request. Any way then
265    // condition 2 itself will match
266    // 4. When SASL use is ON.
267    if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead ||
268        useSasl || length < this.rpcServer.minSizeForReservoirUse) {
269      this.data = new SingleByteBuff(ByteBuffer.allocate(length));
270    } else {
271      Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(
272        this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length);
273      this.data = pair.getFirst();
274      this.callCleanup = pair.getSecond();
275    }
276  }
277
278  protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
279    int count = buf.read(channel);
280    if (count > 0) {
281      this.rpcServer.metrics.receivedBytes(count);
282    }
283    return count;
284  }
285
286  /**
287   * Process the data buffer and clean the connection state for the next call.
288   */
289  private void process() throws IOException, InterruptedException {
290    data.rewind();
291    try {
292      if (skipInitialSaslHandshake) {
293        skipInitialSaslHandshake = false;
294        return;
295      }
296
297      if (useSasl) {
298        saslReadAndProcess(data);
299      } else {
300        processOneRpc(data);
301      }
302
303    } finally {
304      dataLengthBuffer.clear(); // Clean for the next call
305      data = null; // For the GC
306      this.callCleanup = null;
307    }
308  }
309
310  @Override
311  public synchronized void close() {
312    disposeSasl();
313    data = null;
314    callCleanup = null;
315    if (!channel.isOpen()) return;
316    try {
317      socket.shutdownOutput();
318    } catch (Exception ignored) {
319      if (SimpleRpcServer.LOG.isTraceEnabled()) {
320        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
321      }
322    }
323    if (channel.isOpen()) {
324      try {
325        channel.close();
326      } catch (Exception ignored) {
327      }
328    }
329    try {
330      socket.close();
331    } catch (Exception ignored) {
332      if (SimpleRpcServer.LOG.isTraceEnabled()) {
333        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
334      }
335    }
336  }
337
338  @Override
339  public boolean isConnectionOpen() {
340    return channel.isOpen();
341  }
342
343  @Override
344  public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md,
345      RequestHeader header, Message param, CellScanner cellScanner, long size,
346      InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
347    return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
348        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
349        this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
350  }
351
352  @Override
353  protected void doRespond(RpcResponse resp) throws IOException {
354    responder.doRespond(this, resp);
355  }
356}