001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.net.InetAddress;
024import java.net.Socket;
025import java.nio.ByteBuffer;
026import java.nio.channels.Channels;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.SocketChannel;
029import java.util.concurrent.ConcurrentLinkedDeque;
030import java.util.concurrent.atomic.LongAdder;
031import java.util.concurrent.locks.Lock;
032import java.util.concurrent.locks.ReentrantLock;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.ExtendedCellScanner;
035import org.apache.hadoop.hbase.client.VersionInfoUtil;
036import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
037import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
038import org.apache.hadoop.hbase.nio.ByteBuff;
039import org.apache.hadoop.hbase.nio.SingleByteBuff;
040import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
041import org.apache.hadoop.hbase.security.SaslStatus;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.io.BytesWritable;
044import org.apache.yetus.audience.InterfaceAudience;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
047import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
049import org.apache.hbase.thirdparty.com.google.protobuf.Message;
050
051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
053
054/** Reads calls from a connection and queues them for handling. */
055@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
056    justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
057@Deprecated
058@InterfaceAudience.Private
059class SimpleServerRpcConnection extends ServerRpcConnection {
060
061  final SocketChannel channel;
062  private ByteBuff data;
063  private ByteBuffer dataLengthBuffer;
064  private ByteBuffer preambleBuffer;
065  private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
066  private long lastContact;
067  private final Socket socket;
068  final SimpleRpcServerResponder responder;
069
070  // If initial preamble with version and magic has been read or not.
071  private boolean connectionPreambleRead = false;
072  private boolean saslContextEstablished;
073  private ByteBuffer unwrappedData;
074  // When is this set? FindBugs wants to know! Says NP
075  private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
076  boolean useWrap = false;
077
078  final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>();
079  final Lock responseWriteLock = new ReentrantLock();
080  long lastSentTime = -1L;
081
082  public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel,
083    long lastContact) {
084    super(rpcServer);
085    this.channel = channel;
086    this.lastContact = lastContact;
087    this.data = null;
088    this.dataLengthBuffer = ByteBuffer.allocate(4);
089    this.socket = channel.socket();
090    this.addr = socket.getInetAddress();
091    if (addr == null) {
092      this.hostAddress = "*Unknown*";
093    } else {
094      this.hostAddress = addr.getHostAddress();
095    }
096    this.remotePort = socket.getPort();
097    if (rpcServer.socketSendBufferSize != 0) {
098      try {
099        socket.setSendBufferSize(rpcServer.socketSendBufferSize);
100      } catch (IOException e) {
101        SimpleRpcServer.LOG.warn(
102          "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize);
103      }
104    }
105    this.responder = rpcServer.responder;
106  }
107
108  public void setLastContact(long lastContact) {
109    this.lastContact = lastContact;
110  }
111
112  public long getLastContact() {
113    return lastContact;
114  }
115
116  /* Return true if the connection has no outstanding rpc */
117  boolean isIdle() {
118    return rpcCount.sum() == 0;
119  }
120
121  /* Decrement the outstanding RPC count */
122  protected void decRpcCount() {
123    rpcCount.decrement();
124  }
125
126  /* Increment the outstanding RPC count */
127  protected void incRpcCount() {
128    rpcCount.increment();
129  }
130
131  private int readPreamble() throws IOException {
132    if (preambleBuffer == null) {
133      preambleBuffer = ByteBuffer.allocate(6);
134    }
135    int count = this.rpcServer.channelRead(channel, preambleBuffer);
136    if (count < 0 || preambleBuffer.remaining() > 0) {
137      return count;
138    }
139    preambleBuffer.flip();
140    PreambleResponse resp = processPreamble(preambleBuffer);
141    switch (resp) {
142      case SUCCEED:
143        preambleBuffer = null; // do not need it anymore
144        connectionPreambleRead = true;
145        return count;
146      case CONTINUE:
147        // wait for the next preamble header
148        preambleBuffer.clear();
149        return count;
150      case CLOSE:
151        return -1;
152      default:
153        throw new IllegalArgumentException("Unknown preamble response: " + resp);
154    }
155  }
156
157  private int read4Bytes() throws IOException {
158    if (this.dataLengthBuffer.remaining() > 0) {
159      return this.rpcServer.channelRead(channel, this.dataLengthBuffer);
160    } else {
161      return 0;
162    }
163  }
164
165  private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException {
166    ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
167    // Read all RPCs contained in the inBuf, even partial ones
168    while (true) {
169      int count;
170      if (unwrappedDataLengthBuffer.remaining() > 0) {
171        count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer);
172        if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) {
173          return;
174        }
175      }
176
177      if (unwrappedData == null) {
178        unwrappedDataLengthBuffer.flip();
179        int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
180
181        if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
182          if (RpcServer.LOG.isDebugEnabled()) RpcServer.LOG.debug("Received ping message");
183          unwrappedDataLengthBuffer.clear();
184          continue; // ping message
185        }
186        unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
187      }
188
189      count = this.rpcServer.channelRead(ch, unwrappedData);
190      if (count <= 0 || unwrappedData.remaining() > 0) {
191        return;
192      }
193
194      if (unwrappedData.remaining() == 0) {
195        unwrappedDataLengthBuffer.clear();
196        unwrappedData.flip();
197        processOneRpc(new SingleByteBuff(unwrappedData));
198        unwrappedData = null;
199      }
200    }
201  }
202
203  private void saslReadAndProcess(ByteBuff saslToken) throws IOException, InterruptedException {
204    if (saslContextEstablished) {
205      RpcServer.LOG.trace("Read input token of size={} for processing by saslServer.unwrap()",
206        saslToken.limit());
207      if (!useWrap) {
208        processOneRpc(saslToken);
209      } else {
210        byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
211        byte[] plaintextData = saslServer.unwrap(b, 0, b.length);
212        // release the request buffer as we have already unwrapped all its content
213        callCleanupIfNeeded();
214        processUnwrappedData(plaintextData);
215      }
216    } else {
217      byte[] replyToken;
218      try {
219        try {
220          getOrCreateSaslServer();
221        } catch (Exception e) {
222          RpcServer.LOG.error("Error when trying to create instance of HBaseSaslRpcServer "
223            + "with sasl provider: " + provider, e);
224          throw e;
225        }
226        RpcServer.LOG.debug("Created SASL server with mechanism={}",
227          provider.getSaslAuthMethod().getAuthMethod());
228        RpcServer.LOG.debug(
229          "Read input token of size={} for processing by saslServer." + "evaluateResponse()",
230          saslToken.limit());
231        replyToken = saslServer
232          .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
233      } catch (IOException e) {
234        RpcServer.LOG.debug("Failed to execute SASL handshake", e);
235        Throwable sendToClient = HBaseSaslRpcServer.unwrap(e);
236        doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
237          sendToClient.getLocalizedMessage());
238        this.rpcServer.metrics.authenticationFailure();
239        String clientIP = this.toString();
240        // attempting user could be null
241        RpcServer.AUDITLOG.warn("{}{}: {}", RpcServer.AUTH_FAILED_FOR, clientIP,
242          saslServer.getAttemptingUser());
243        throw e;
244      } finally {
245        // release the request buffer as we have already unwrapped all its content
246        callCleanupIfNeeded();
247      }
248      if (replyToken != null) {
249        if (RpcServer.LOG.isDebugEnabled()) {
250          RpcServer.LOG.debug("Will send token of size " + replyToken.length + " from saslServer.");
251        }
252        doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null);
253      }
254      if (saslServer.isComplete()) {
255        String qop = saslServer.getNegotiatedQop();
256        useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
257        ugi =
258          provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager);
259        RpcServer.LOG.debug(
260          "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi,
261          qop);
262        this.rpcServer.metrics.authenticationSuccess();
263        RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
264        saslContextEstablished = true;
265      }
266    }
267  }
268
269  /**
270   * Read off the wire. If there is not enough data to read, update the connection state with what
271   * we have and returns.
272   * @return Returns -1 if failure (and caller will close connection), else zero or more.
273   */
274  public int readAndProcess() throws IOException, InterruptedException {
275    // If we have not read the connection setup preamble, look to see if that is on the wire.
276    if (!connectionPreambleRead) {
277      int count = readPreamble();
278      if (!connectionPreambleRead) {
279        return count;
280      }
281    }
282
283    // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the
284    // integer length into the 4-byte this.dataLengthBuffer.
285    int count = read4Bytes();
286    if (count < 0 || dataLengthBuffer.remaining() > 0) {
287      return count;
288    }
289
290    // We have read a length and we have read the preamble. It is either the connection header
291    // or it is a request.
292    if (data == null) {
293      dataLengthBuffer.flip();
294      int dataLength = dataLengthBuffer.getInt();
295      if (dataLength == RpcClient.PING_CALL_ID) {
296        if (!useWrap) { // covers the !useSasl too
297          dataLengthBuffer.clear();
298          return 0; // ping message
299        }
300      }
301      if (dataLength < 0) { // A data length of zero is legal.
302        throw new DoNotRetryIOException(
303          "Unexpected data length " + dataLength + "!! from " + getHostAddress());
304      }
305
306      if (dataLength > this.rpcServer.maxRequestSize) {
307        String msg = "RPC data length of " + dataLength + " received from " + getHostAddress()
308          + " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \""
309          + SimpleRpcServer.MAX_REQUEST_SIZE
310          + "\" on server to override this limit (not recommended)";
311        SimpleRpcServer.LOG.warn(msg);
312
313        if (connectionHeaderRead && connectionPreambleRead) {
314          incRpcCount();
315          // Construct InputStream for the non-blocking SocketChannel
316          // We need the InputStream because we want to read only the request header
317          // instead of the whole rpc.
318          ByteBuffer buf = ByteBuffer.allocate(1);
319          InputStream is = new InputStream() {
320            @Override
321            public int read() throws IOException {
322              SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf);
323              buf.flip();
324              int x = buf.get();
325              buf.flip();
326              return x;
327            }
328          };
329          CodedInputStream cis = CodedInputStream.newInstance(is);
330          int headerSize = cis.readRawVarint32();
331          Message.Builder builder = RequestHeader.newBuilder();
332          ProtobufUtil.mergeFrom(builder, cis, headerSize);
333          RequestHeader header = (RequestHeader) builder.build();
334
335          // Notify the client about the offending request
336          SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
337            null, null, null, this, 0, this.addr, EnvironmentEdgeManager.currentTime(), 0,
338            this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder);
339          RequestTooBigException reqTooBigEx = new RequestTooBigException(msg);
340          this.rpcServer.metrics.exception(reqTooBigEx);
341          // Make sure the client recognizes the underlying exception
342          // Otherwise, throw a DoNotRetryIOException.
343          if (
344            VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
345              RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)
346          ) {
347            reqTooBig.setResponse(null, null, reqTooBigEx, msg);
348          } else {
349            reqTooBig.setResponse(null, null, new DoNotRetryIOException(msg), msg);
350          }
351          // In most cases we will write out the response directly. If not, it is still OK to just
352          // close the connection without writing out the reqTooBig response. Do not try to write
353          // out directly here, and it will cause deserialization error if the connection is slow
354          // and we have a half writing response in the queue.
355          reqTooBig.sendResponseIfReady();
356        }
357        // Close the connection
358        return -1;
359      }
360
361      // Initialize this.data with a ByteBuff.
362      // This call will allocate a ByteBuff to read request into and assign to this.data
363      // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
364      // assign to this.callCleanup
365      initByteBuffToReadInto(dataLength);
366
367      // Increment the rpc count. This counter will be decreased when we write
368      // the response. If we want the connection to be detected as idle properly, we
369      // need to keep the inc / dec correct.
370      incRpcCount();
371    }
372
373    count = channelDataRead(channel, data);
374
375    if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
376      process();
377    }
378
379    return count;
380  }
381
382  // It creates the ByteBuff and CallCleanup and assign to Connection instance.
383  private void initByteBuffToReadInto(int length) {
384    this.data = rpcServer.bbAllocator.allocate(length);
385    this.callCleanup = data::release;
386  }
387
388  protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
389    int count = buf.read(channel);
390    if (count > 0) {
391      this.rpcServer.metrics.receivedBytes(count);
392    }
393    return count;
394  }
395
396  /**
397   * Process the data buffer and clean the connection state for the next call.
398   */
399  private void process() throws IOException, InterruptedException {
400    data.rewind();
401    try {
402      if (skipInitialSaslHandshake) {
403        skipInitialSaslHandshake = false;
404        return;
405      }
406
407      if (useSasl) {
408        saslReadAndProcess(data);
409      } else {
410        processOneRpc(data);
411      }
412    } catch (Exception e) {
413      callCleanupIfNeeded();
414      throw e;
415    } finally {
416      dataLengthBuffer.clear(); // Clean for the next call
417      data = null; // For the GC
418      this.callCleanup = null;
419    }
420  }
421
422  @Override
423  public synchronized void close() {
424    disposeSasl();
425    data = null;
426    callCleanupIfNeeded();
427    if (!channel.isOpen()) {
428      return;
429    }
430    try {
431      socket.shutdownOutput();
432    } catch (Exception ignored) {
433      if (SimpleRpcServer.LOG.isTraceEnabled()) {
434        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
435      }
436    }
437    if (channel.isOpen()) {
438      try {
439        channel.close();
440      } catch (Exception ignored) {
441      }
442    }
443    try {
444      socket.close();
445    } catch (Exception ignored) {
446      if (SimpleRpcServer.LOG.isTraceEnabled()) {
447        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
448      }
449    }
450  }
451
452  @Override
453  public boolean isConnectionOpen() {
454    return channel.isOpen();
455  }
456
457  @Override
458  public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md,
459    RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size,
460    InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
461    return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
462      remoteAddress, EnvironmentEdgeManager.currentTime(), timeout, this.rpcServer.bbAllocator,
463      this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
464  }
465
466  @Override
467  protected void doRespond(RpcResponse resp) throws IOException {
468    responder.doRespond(this, resp);
469  }
470}