001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.net.InetAddress;
023import java.net.Socket;
024import java.nio.ByteBuffer;
025import java.nio.channels.ReadableByteChannel;
026import java.nio.channels.SocketChannel;
027import java.util.concurrent.ConcurrentLinkedDeque;
028import java.util.concurrent.atomic.LongAdder;
029import java.util.concurrent.locks.Lock;
030import java.util.concurrent.locks.ReentrantLock;
031import org.apache.hadoop.hbase.CellScanner;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.client.VersionInfoUtil;
034import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
035import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
036import org.apache.hadoop.hbase.nio.ByteBuff;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
041import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
042import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
043import org.apache.hbase.thirdparty.com.google.protobuf.Message;
044
045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
047
048/** Reads calls from a connection and queues them for handling. */
049@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
050    justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
051@Deprecated
052@InterfaceAudience.Private
053class SimpleServerRpcConnection extends ServerRpcConnection {
054
055  final SocketChannel channel;
056  private ByteBuff data;
057  private ByteBuffer dataLengthBuffer;
058  private ByteBuffer preambleBuffer;
059  private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
060  private long lastContact;
061  private final Socket socket;
062  final SimpleRpcServerResponder responder;
063
064  // If initial preamble with version and magic has been read or not.
065  private boolean connectionPreambleRead = false;
066
067  final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>();
068  final Lock responseWriteLock = new ReentrantLock();
069  long lastSentTime = -1L;
070
071  public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel,
072    long lastContact) {
073    super(rpcServer);
074    this.channel = channel;
075    this.lastContact = lastContact;
076    this.data = null;
077    this.dataLengthBuffer = ByteBuffer.allocate(4);
078    this.socket = channel.socket();
079    this.addr = socket.getInetAddress();
080    if (addr == null) {
081      this.hostAddress = "*Unknown*";
082    } else {
083      this.hostAddress = addr.getHostAddress();
084    }
085    this.remotePort = socket.getPort();
086    if (rpcServer.socketSendBufferSize != 0) {
087      try {
088        socket.setSendBufferSize(rpcServer.socketSendBufferSize);
089      } catch (IOException e) {
090        SimpleRpcServer.LOG.warn(
091          "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize);
092      }
093    }
094    this.responder = rpcServer.responder;
095  }
096
097  public void setLastContact(long lastContact) {
098    this.lastContact = lastContact;
099  }
100
101  public long getLastContact() {
102    return lastContact;
103  }
104
105  /* Return true if the connection has no outstanding rpc */
106  boolean isIdle() {
107    return rpcCount.sum() == 0;
108  }
109
110  /* Decrement the outstanding RPC count */
111  protected void decRpcCount() {
112    rpcCount.decrement();
113  }
114
115  /* Increment the outstanding RPC count */
116  protected void incRpcCount() {
117    rpcCount.increment();
118  }
119
120  private int readPreamble() throws IOException {
121    if (preambleBuffer == null) {
122      preambleBuffer = ByteBuffer.allocate(6);
123    }
124    int count = this.rpcServer.channelRead(channel, preambleBuffer);
125    if (count < 0 || preambleBuffer.remaining() > 0) {
126      return count;
127    }
128    preambleBuffer.flip();
129    if (!processPreamble(preambleBuffer)) {
130      return -1;
131    }
132    preambleBuffer = null; // do not need it anymore
133    connectionPreambleRead = true;
134    return count;
135  }
136
137  private int read4Bytes() throws IOException {
138    if (this.dataLengthBuffer.remaining() > 0) {
139      return this.rpcServer.channelRead(channel, this.dataLengthBuffer);
140    } else {
141      return 0;
142    }
143  }
144
145  /**
146   * Read off the wire. If there is not enough data to read, update the connection state with what
147   * we have and returns.
148   * @return Returns -1 if failure (and caller will close connection), else zero or more. nn
149   */
150  public int readAndProcess() throws IOException, InterruptedException {
151    // If we have not read the connection setup preamble, look to see if that is on the wire.
152    if (!connectionPreambleRead) {
153      int count = readPreamble();
154      if (!connectionPreambleRead) {
155        return count;
156      }
157    }
158
159    // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the
160    // integer length into the 4-byte this.dataLengthBuffer.
161    int count = read4Bytes();
162    if (count < 0 || dataLengthBuffer.remaining() > 0) {
163      return count;
164    }
165
166    // We have read a length and we have read the preamble. It is either the connection header
167    // or it is a request.
168    if (data == null) {
169      dataLengthBuffer.flip();
170      int dataLength = dataLengthBuffer.getInt();
171      if (dataLength == RpcClient.PING_CALL_ID) {
172        if (!useWrap) { // covers the !useSasl too
173          dataLengthBuffer.clear();
174          return 0; // ping message
175        }
176      }
177      if (dataLength < 0) { // A data length of zero is legal.
178        throw new DoNotRetryIOException(
179          "Unexpected data length " + dataLength + "!! from " + getHostAddress());
180      }
181
182      if (dataLength > this.rpcServer.maxRequestSize) {
183        String msg = "RPC data length of " + dataLength + " received from " + getHostAddress()
184          + " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \""
185          + SimpleRpcServer.MAX_REQUEST_SIZE
186          + "\" on server to override this limit (not recommended)";
187        SimpleRpcServer.LOG.warn(msg);
188
189        if (connectionHeaderRead && connectionPreambleRead) {
190          incRpcCount();
191          // Construct InputStream for the non-blocking SocketChannel
192          // We need the InputStream because we want to read only the request header
193          // instead of the whole rpc.
194          ByteBuffer buf = ByteBuffer.allocate(1);
195          InputStream is = new InputStream() {
196            @Override
197            public int read() throws IOException {
198              SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf);
199              buf.flip();
200              int x = buf.get();
201              buf.flip();
202              return x;
203            }
204          };
205          CodedInputStream cis = CodedInputStream.newInstance(is);
206          int headerSize = cis.readRawVarint32();
207          Message.Builder builder = RequestHeader.newBuilder();
208          ProtobufUtil.mergeFrom(builder, cis, headerSize);
209          RequestHeader header = (RequestHeader) builder.build();
210
211          // Notify the client about the offending request
212          SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
213            null, null, null, this, 0, this.addr, EnvironmentEdgeManager.currentTime(), 0,
214            this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder);
215          RequestTooBigException reqTooBigEx = new RequestTooBigException(msg);
216          this.rpcServer.metrics.exception(reqTooBigEx);
217          // Make sure the client recognizes the underlying exception
218          // Otherwise, throw a DoNotRetryIOException.
219          if (
220            VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
221              RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)
222          ) {
223            reqTooBig.setResponse(null, null, reqTooBigEx, msg);
224          } else {
225            reqTooBig.setResponse(null, null, new DoNotRetryIOException(msg), msg);
226          }
227          // In most cases we will write out the response directly. If not, it is still OK to just
228          // close the connection without writing out the reqTooBig response. Do not try to write
229          // out directly here, and it will cause deserialization error if the connection is slow
230          // and we have a half writing response in the queue.
231          reqTooBig.sendResponseIfReady();
232        }
233        // Close the connection
234        return -1;
235      }
236
237      // Initialize this.data with a ByteBuff.
238      // This call will allocate a ByteBuff to read request into and assign to this.data
239      // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
240      // assign to this.callCleanup
241      initByteBuffToReadInto(dataLength);
242
243      // Increment the rpc count. This counter will be decreased when we write
244      // the response. If we want the connection to be detected as idle properly, we
245      // need to keep the inc / dec correct.
246      incRpcCount();
247    }
248
249    count = channelDataRead(channel, data);
250
251    if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
252      process();
253    }
254
255    return count;
256  }
257
258  // It creates the ByteBuff and CallCleanup and assign to Connection instance.
259  private void initByteBuffToReadInto(int length) {
260    this.data = rpcServer.bbAllocator.allocate(length);
261    this.callCleanup = data::release;
262  }
263
264  protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
265    int count = buf.read(channel);
266    if (count > 0) {
267      this.rpcServer.metrics.receivedBytes(count);
268    }
269    return count;
270  }
271
272  /**
273   * Process the data buffer and clean the connection state for the next call.
274   */
275  private void process() throws IOException, InterruptedException {
276    data.rewind();
277    try {
278      if (skipInitialSaslHandshake) {
279        skipInitialSaslHandshake = false;
280        return;
281      }
282
283      if (useSasl) {
284        saslReadAndProcess(data);
285      } else {
286        processOneRpc(data);
287      }
288    } catch (Exception e) {
289      callCleanupIfNeeded();
290      throw e;
291    } finally {
292      dataLengthBuffer.clear(); // Clean for the next call
293      data = null; // For the GC
294      this.callCleanup = null;
295    }
296  }
297
298  @Override
299  public synchronized void close() {
300    disposeSasl();
301    data = null;
302    callCleanupIfNeeded();
303    if (!channel.isOpen()) {
304      return;
305    }
306    try {
307      socket.shutdownOutput();
308    } catch (Exception ignored) {
309      if (SimpleRpcServer.LOG.isTraceEnabled()) {
310        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
311      }
312    }
313    if (channel.isOpen()) {
314      try {
315        channel.close();
316      } catch (Exception ignored) {
317      }
318    }
319    try {
320      socket.close();
321    } catch (Exception ignored) {
322      if (SimpleRpcServer.LOG.isTraceEnabled()) {
323        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
324      }
325    }
326  }
327
328  @Override
329  public boolean isConnectionOpen() {
330    return channel.isOpen();
331  }
332
333  @Override
334  public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md,
335    RequestHeader header, Message param, CellScanner cellScanner, long size,
336    InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
337    return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
338      remoteAddress, EnvironmentEdgeManager.currentTime(), timeout, this.rpcServer.bbAllocator,
339      this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
340  }
341
342  @Override
343  protected void doRespond(RpcResponse resp) throws IOException {
344    responder.doRespond(this, resp);
345  }
346}