001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.nio.channels.CancelledKeyException;
023import java.nio.channels.ClosedChannelException;
024import java.nio.channels.SelectionKey;
025import java.nio.channels.Selector;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Iterator;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import org.apache.hadoop.hbase.HBaseIOException;
032import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.hadoop.util.StringUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * Sends responses of RPC back to clients.
041 */
042@Deprecated
043@InterfaceAudience.Private
044class SimpleRpcServerResponder extends Thread {
045
046  private final SimpleRpcServer simpleRpcServer;
047  private final Selector writeSelector;
048  private final Set<SimpleServerRpcConnection> writingCons =
049    Collections.newSetFromMap(new ConcurrentHashMap<>());
050
051  SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException {
052    this.simpleRpcServer = simpleRpcServer;
053    this.setName("RpcServer.responder");
054    this.setDaemon(true);
055    this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
056    writeSelector = Selector.open(); // create a selector
057  }
058
059  @Override
060  public void run() {
061    SimpleRpcServer.LOG.debug(getName() + ": starting");
062    try {
063      doRunLoop();
064    } finally {
065      SimpleRpcServer.LOG.info(getName() + ": stopping");
066      try {
067        writeSelector.close();
068      } catch (IOException ioe) {
069        SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe);
070      }
071    }
072  }
073
074  /**
075   * Take the list of the connections that want to write, and register them in the selector.
076   */
077  private void registerWrites() {
078    Iterator<SimpleServerRpcConnection> it = writingCons.iterator();
079    while (it.hasNext()) {
080      SimpleServerRpcConnection c = it.next();
081      it.remove();
082      SelectionKey sk = c.channel.keyFor(writeSelector);
083      try {
084        if (sk == null) {
085          try {
086            c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
087          } catch (ClosedChannelException e) {
088            // ignore: the client went away.
089            if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
090          }
091        } else {
092          sk.interestOps(SelectionKey.OP_WRITE);
093        }
094      } catch (CancelledKeyException e) {
095        // ignore: the client went away.
096        if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
097      }
098    }
099  }
100
101  /**
102   * Add a connection to the list that want to write,
103   */
104  public void registerForWrite(SimpleServerRpcConnection c) {
105    if (writingCons.add(c)) {
106      writeSelector.wakeup();
107    }
108  }
109
110  private void doRunLoop() {
111    long lastPurgeTime = 0; // last check for old calls.
112    while (this.simpleRpcServer.running) {
113      try {
114        registerWrites();
115        int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout);
116        if (keyCt == 0) {
117          continue;
118        }
119
120        Set<SelectionKey> keys = writeSelector.selectedKeys();
121        Iterator<SelectionKey> iter = keys.iterator();
122        while (iter.hasNext()) {
123          SelectionKey key = iter.next();
124          iter.remove();
125          try {
126            if (key.isValid() && key.isWritable()) {
127              doAsyncWrite(key);
128            }
129          } catch (IOException e) {
130            SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e);
131          }
132        }
133
134        lastPurgeTime = purge(lastPurgeTime);
135
136      } catch (OutOfMemoryError e) {
137        if (this.simpleRpcServer.errorHandler != null) {
138          if (this.simpleRpcServer.errorHandler.checkOOME(e)) {
139            SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError");
140            return;
141          }
142        } else {
143          //
144          // we can run out of memory if we have too many threads
145          // log the event and sleep for a minute and give
146          // some thread(s) a chance to finish
147          //
148          SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e);
149          try {
150            Thread.sleep(60000);
151          } catch (InterruptedException ex) {
152            SimpleRpcServer.LOG.debug("Interrupted while sleeping");
153            return;
154          }
155        }
156      } catch (Exception e) {
157        SimpleRpcServer.LOG
158          .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e);
159      }
160    }
161    SimpleRpcServer.LOG.info(getName() + ": stopped");
162  }
163
164  /**
165   * If there were some calls that have not been sent out for a long time, we close the connection.
166   * @return the time of the purge.
167   */
168  private long purge(long lastPurgeTime) {
169    long now = EnvironmentEdgeManager.currentTime();
170    if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) {
171      return lastPurgeTime;
172    }
173
174    ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>();
175    // get the list of channels from list of keys.
176    synchronized (writeSelector.keys()) {
177      for (SelectionKey key : writeSelector.keys()) {
178        SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
179        if (connection == null) {
180          throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
181        }
182        if (
183          connection.lastSentTime > 0
184            && now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout
185        ) {
186          conWithOldCalls.add(connection);
187        }
188      }
189    }
190
191    // Seems safer to close the connection outside of the synchronized loop...
192    for (SimpleServerRpcConnection connection : conWithOldCalls) {
193      this.simpleRpcServer.closeConnection(connection);
194    }
195
196    return now;
197  }
198
199  private void doAsyncWrite(SelectionKey key) throws IOException {
200    SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
201    if (connection == null) {
202      throw new IOException("doAsyncWrite: no connection");
203    }
204    if (key.channel() != connection.channel) {
205      throw new IOException("doAsyncWrite: bad channel");
206    }
207
208    if (processAllResponses(connection)) {
209      try {
210        // We wrote everything, so we don't need to be told when the socket is ready for
211        // write anymore.
212        key.interestOps(0);
213      } catch (CancelledKeyException e) {
214        /*
215         * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so
216         * not sure if this will ever fire. This warning could be removed.
217         */
218        SimpleRpcServer.LOG.warn("Exception while changing ops : " + e);
219      }
220    }
221  }
222
223  private BufferChain wrapWithSasl(HBaseSaslRpcServer saslServer, BufferChain bc)
224    throws IOException {
225    // Looks like no way around this; saslserver wants a byte array. I have to make it one.
226    // THIS IS A BIG UGLY COPY.
227    byte[] responseBytes = bc.getBytes();
228    byte[] token;
229    // synchronization may be needed since there can be multiple Handler
230    // threads using saslServer or Crypto AES to wrap responses.
231    synchronized (saslServer) {
232      token = saslServer.wrap(responseBytes, 0, responseBytes.length);
233    }
234    if (SimpleRpcServer.LOG.isTraceEnabled()) {
235      SimpleRpcServer.LOG
236        .trace("Adding saslServer wrapped token of size " + token.length + " as call response.");
237    }
238
239    ByteBuffer[] responseBufs = new ByteBuffer[2];
240    responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
241    responseBufs[1] = ByteBuffer.wrap(token);
242    return new BufferChain(responseBufs);
243  }
244
245  /**
246   * Process the response for this call. You need to have the lock on
247   * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock}
248   * @return true if we proceed the call fully, false otherwise.
249   */
250  private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp)
251    throws IOException {
252    boolean error = true;
253    BufferChain buf = resp.getResponse();
254    if (conn.useWrap) {
255      buf = wrapWithSasl(conn.saslServer, buf);
256    }
257    try {
258      // Send as much data as we can in the non-blocking fashion
259      long numBytes = this.simpleRpcServer.channelWrite(conn.channel, buf);
260      if (numBytes < 0) {
261        throw new HBaseIOException("Error writing on the socket " + conn);
262      }
263      error = false;
264    } finally {
265      if (error) {
266        SimpleRpcServer.LOG.debug(conn + ": output error -- closing");
267        // We will be closing this connection itself. Mark this call as done so that all the
268        // buffer(s) it got from pool can get released
269        resp.done();
270        this.simpleRpcServer.closeConnection(conn);
271      }
272    }
273
274    if (!buf.hasRemaining()) {
275      resp.done();
276      return true;
277    } else {
278      // set the serve time when the response has to be sent later
279      conn.lastSentTime = EnvironmentEdgeManager.currentTime();
280      return false; // Socket can't take more, we will have to come back.
281    }
282  }
283
284  /**
285   * Process all the responses for this connection
286   * @return true if all the calls were processed or that someone else is doing it. false if there
287   *         is still some work to do. In this case, we expect the caller to delay us.
288   */
289  private boolean processAllResponses(final SimpleServerRpcConnection connection)
290    throws IOException {
291    // We want only one writer on the channel for a connection at a time.
292    connection.responseWriteLock.lock();
293    try {
294      for (int i = 0; i < 20; i++) {
295        // protection if some handlers manage to need all the responder
296        RpcResponse resp = connection.responseQueue.pollFirst();
297        if (resp == null) {
298          return true;
299        }
300        if (!processResponse(connection, resp)) {
301          connection.responseQueue.addFirst(resp);
302          return false;
303        }
304      }
305    } finally {
306      connection.responseWriteLock.unlock();
307    }
308
309    return connection.responseQueue.isEmpty();
310  }
311
312  //
313  // Enqueue a response from the application.
314  //
315  void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException {
316    boolean added = false;
317    // If there is already a write in progress, we don't wait. This allows to free the handlers
318    // immediately for other tasks.
319    if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) {
320      try {
321        if (conn.responseQueue.isEmpty()) {
322          // If we're alone, we can try to do a direct call to the socket. It's
323          // an optimization to save on context switches and data transfer between cores..
324          if (processResponse(conn, resp)) {
325            return; // we're done.
326          }
327          // Too big to fit, putting ahead.
328          conn.responseQueue.addFirst(resp);
329          added = true; // We will register to the selector later, outside of the lock.
330        }
331      } finally {
332        conn.responseWriteLock.unlock();
333      }
334    }
335
336    if (!added) {
337      conn.responseQueue.addLast(resp);
338    }
339    registerForWrite(conn);
340  }
341}