001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.nio.channels.CancelledKeyException;
022import java.nio.channels.ClosedChannelException;
023import java.nio.channels.SelectionKey;
024import java.nio.channels.Selector;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Iterator;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import org.apache.hadoop.hbase.HBaseIOException;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.Threads;
033import org.apache.hadoop.util.StringUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 * Sends responses of RPC back to clients.
038 */
039@InterfaceAudience.Private
040class SimpleRpcServerResponder extends Thread {
041
042  private final SimpleRpcServer simpleRpcServer;
043  private final Selector writeSelector;
044  private final Set<SimpleServerRpcConnection> writingCons =
045    Collections.newSetFromMap(new ConcurrentHashMap<>());
046
047  SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException {
048    this.simpleRpcServer = simpleRpcServer;
049    this.setName("RpcServer.responder");
050    this.setDaemon(true);
051    this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
052    writeSelector = Selector.open(); // create a selector
053  }
054
055  @Override
056  public void run() {
057    SimpleRpcServer.LOG.debug(getName() + ": starting");
058    try {
059      doRunLoop();
060    } finally {
061      SimpleRpcServer.LOG.info(getName() + ": stopping");
062      try {
063        writeSelector.close();
064      } catch (IOException ioe) {
065        SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe);
066      }
067    }
068  }
069
070  /**
071   * Take the list of the connections that want to write, and register them in the selector.
072   */
073  private void registerWrites() {
074    Iterator<SimpleServerRpcConnection> it = writingCons.iterator();
075    while (it.hasNext()) {
076      SimpleServerRpcConnection c = it.next();
077      it.remove();
078      SelectionKey sk = c.channel.keyFor(writeSelector);
079      try {
080        if (sk == null) {
081          try {
082            c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
083          } catch (ClosedChannelException e) {
084            // ignore: the client went away.
085            if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
086          }
087        } else {
088          sk.interestOps(SelectionKey.OP_WRITE);
089        }
090      } catch (CancelledKeyException e) {
091        // ignore: the client went away.
092        if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
093      }
094    }
095  }
096
097  /**
098   * Add a connection to the list that want to write,
099   */
100  public void registerForWrite(SimpleServerRpcConnection c) {
101    if (writingCons.add(c)) {
102      writeSelector.wakeup();
103    }
104  }
105
106  private void doRunLoop() {
107    long lastPurgeTime = 0; // last check for old calls.
108    while (this.simpleRpcServer.running) {
109      try {
110        registerWrites();
111        int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout);
112        if (keyCt == 0) {
113          continue;
114        }
115
116        Set<SelectionKey> keys = writeSelector.selectedKeys();
117        Iterator<SelectionKey> iter = keys.iterator();
118        while (iter.hasNext()) {
119          SelectionKey key = iter.next();
120          iter.remove();
121          try {
122            if (key.isValid() && key.isWritable()) {
123              doAsyncWrite(key);
124            }
125          } catch (IOException e) {
126            SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e);
127          }
128        }
129
130        lastPurgeTime = purge(lastPurgeTime);
131
132      } catch (OutOfMemoryError e) {
133        if (this.simpleRpcServer.errorHandler != null) {
134          if (this.simpleRpcServer.errorHandler.checkOOME(e)) {
135            SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError");
136            return;
137          }
138        } else {
139          //
140          // we can run out of memory if we have too many threads
141          // log the event and sleep for a minute and give
142          // some thread(s) a chance to finish
143          //
144          SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e);
145          try {
146            Thread.sleep(60000);
147          } catch (InterruptedException ex) {
148            SimpleRpcServer.LOG.debug("Interrupted while sleeping");
149            return;
150          }
151        }
152      } catch (Exception e) {
153        SimpleRpcServer.LOG
154          .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e);
155      }
156    }
157    SimpleRpcServer.LOG.info(getName() + ": stopped");
158  }
159
160  /**
161   * If there were some calls that have not been sent out for a long time, we close the connection.
162   * @return the time of the purge.
163   */
164  private long purge(long lastPurgeTime) {
165    long now = EnvironmentEdgeManager.currentTime();
166    if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) {
167      return lastPurgeTime;
168    }
169
170    ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>();
171    // get the list of channels from list of keys.
172    synchronized (writeSelector.keys()) {
173      for (SelectionKey key : writeSelector.keys()) {
174        SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
175        if (connection == null) {
176          throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
177        }
178        if (
179          connection.lastSentTime > 0
180            && now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout
181        ) {
182          conWithOldCalls.add(connection);
183        }
184      }
185    }
186
187    // Seems safer to close the connection outside of the synchronized loop...
188    for (SimpleServerRpcConnection connection : conWithOldCalls) {
189      this.simpleRpcServer.closeConnection(connection);
190    }
191
192    return now;
193  }
194
195  private void doAsyncWrite(SelectionKey key) throws IOException {
196    SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
197    if (connection == null) {
198      throw new IOException("doAsyncWrite: no connection");
199    }
200    if (key.channel() != connection.channel) {
201      throw new IOException("doAsyncWrite: bad channel");
202    }
203
204    if (processAllResponses(connection)) {
205      try {
206        // We wrote everything, so we don't need to be told when the socket is ready for
207        // write anymore.
208        key.interestOps(0);
209      } catch (CancelledKeyException e) {
210        /*
211         * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so
212         * not sure if this will ever fire. This warning could be removed.
213         */
214        SimpleRpcServer.LOG.warn("Exception while changing ops : " + e);
215      }
216    }
217  }
218
219  /**
220   * Process the response for this call. You need to have the lock on
221   * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock}
222   * @return true if we proceed the call fully, false otherwise. n
223   */
224  private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp)
225    throws IOException {
226    boolean error = true;
227    BufferChain buf = resp.getResponse();
228    try {
229      // Send as much data as we can in the non-blocking fashion
230      long numBytes = this.simpleRpcServer.channelWrite(conn.channel, buf);
231      if (numBytes < 0) {
232        throw new HBaseIOException("Error writing on the socket " + conn);
233      }
234      error = false;
235    } finally {
236      if (error) {
237        SimpleRpcServer.LOG.debug(conn + ": output error -- closing");
238        // We will be closing this connection itself. Mark this call as done so that all the
239        // buffer(s) it got from pool can get released
240        resp.done();
241        this.simpleRpcServer.closeConnection(conn);
242      }
243    }
244
245    if (!buf.hasRemaining()) {
246      resp.done();
247      return true;
248    } else {
249      // set the serve time when the response has to be sent later
250      conn.lastSentTime = EnvironmentEdgeManager.currentTime();
251      return false; // Socket can't take more, we will have to come back.
252    }
253  }
254
255  /**
256   * Process all the responses for this connection
257   * @return true if all the calls were processed or that someone else is doing it. false if there *
258   *         is still some work to do. In this case, we expect the caller to delay us. n
259   */
260  private boolean processAllResponses(final SimpleServerRpcConnection connection)
261    throws IOException {
262    // We want only one writer on the channel for a connection at a time.
263    connection.responseWriteLock.lock();
264    try {
265      for (int i = 0; i < 20; i++) {
266        // protection if some handlers manage to need all the responder
267        RpcResponse resp = connection.responseQueue.pollFirst();
268        if (resp == null) {
269          return true;
270        }
271        if (!processResponse(connection, resp)) {
272          connection.responseQueue.addFirst(resp);
273          return false;
274        }
275      }
276    } finally {
277      connection.responseWriteLock.unlock();
278    }
279
280    return connection.responseQueue.isEmpty();
281  }
282
283  //
284  // Enqueue a response from the application.
285  //
286  void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException {
287    boolean added = false;
288    // If there is already a write in progress, we don't wait. This allows to free the handlers
289    // immediately for other tasks.
290    if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) {
291      try {
292        if (conn.responseQueue.isEmpty()) {
293          // If we're alone, we can try to do a direct call to the socket. It's
294          // an optimization to save on context switches and data transfer between cores..
295          if (processResponse(conn, resp)) {
296            return; // we're done.
297          }
298          // Too big to fit, putting ahead.
299          conn.responseQueue.addFirst(resp);
300          added = true; // We will register to the selector later, outside of the lock.
301        }
302      } finally {
303        conn.responseWriteLock.unlock();
304      }
305    }
306
307    if (!added) {
308      conn.responseQueue.addLast(resp);
309    }
310    registerForWrite(conn);
311  }
312}