001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.nio.channels.CancelledKeyException;
022import java.nio.channels.ClosedChannelException;
023import java.nio.channels.SelectionKey;
024import java.nio.channels.Selector;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Iterator;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030
031import org.apache.hadoop.hbase.HBaseIOException;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.hadoop.util.StringUtils;
035
036/**
037 * Sends responses of RPC back to clients.
038 */
039@InterfaceAudience.Private
040class SimpleRpcServerResponder extends Thread {
041
042  private final SimpleRpcServer simpleRpcServer;
043  private final Selector writeSelector;
044  private final Set<SimpleServerRpcConnection> writingCons =
045      Collections.newSetFromMap(new ConcurrentHashMap<>());
046
047  SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException {
048    this.simpleRpcServer = simpleRpcServer;
049    this.setName("RpcServer.responder");
050    this.setDaemon(true);
051    this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
052    writeSelector = Selector.open(); // create a selector
053  }
054
055  @Override
056  public void run() {
057    SimpleRpcServer.LOG.debug(getName() + ": starting");
058    try {
059      doRunLoop();
060    } finally {
061      SimpleRpcServer.LOG.info(getName() + ": stopping");
062      try {
063        writeSelector.close();
064      } catch (IOException ioe) {
065        SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe);
066      }
067    }
068  }
069
070  /**
071   * Take the list of the connections that want to write, and register them in the selector.
072   */
073  private void registerWrites() {
074    Iterator<SimpleServerRpcConnection> it = writingCons.iterator();
075    while (it.hasNext()) {
076      SimpleServerRpcConnection c = it.next();
077      it.remove();
078      SelectionKey sk = c.channel.keyFor(writeSelector);
079      try {
080        if (sk == null) {
081          try {
082            c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
083          } catch (ClosedChannelException e) {
084            // ignore: the client went away.
085            if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
086          }
087        } else {
088          sk.interestOps(SelectionKey.OP_WRITE);
089        }
090      } catch (CancelledKeyException e) {
091        // ignore: the client went away.
092        if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
093      }
094    }
095  }
096
097  /**
098   * Add a connection to the list that want to write,
099   */
100  public void registerForWrite(SimpleServerRpcConnection c) {
101    if (writingCons.add(c)) {
102      writeSelector.wakeup();
103    }
104  }
105
106  private void doRunLoop() {
107    long lastPurgeTime = 0; // last check for old calls.
108    while (this.simpleRpcServer.running) {
109      try {
110        registerWrites();
111        int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout);
112        if (keyCt == 0) {
113          continue;
114        }
115
116        Set<SelectionKey> keys = writeSelector.selectedKeys();
117        Iterator<SelectionKey> iter = keys.iterator();
118        while (iter.hasNext()) {
119          SelectionKey key = iter.next();
120          iter.remove();
121          try {
122            if (key.isValid() && key.isWritable()) {
123              doAsyncWrite(key);
124            }
125          } catch (IOException e) {
126            SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e);
127          }
128        }
129
130        lastPurgeTime = purge(lastPurgeTime);
131
132      } catch (OutOfMemoryError e) {
133        if (this.simpleRpcServer.errorHandler != null) {
134          if (this.simpleRpcServer.errorHandler.checkOOME(e)) {
135            SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError");
136            return;
137          }
138        } else {
139          //
140          // we can run out of memory if we have too many threads
141          // log the event and sleep for a minute and give
142          // some thread(s) a chance to finish
143          //
144          SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e);
145          try {
146            Thread.sleep(60000);
147          } catch (InterruptedException ex) {
148            SimpleRpcServer.LOG.debug("Interrupted while sleeping");
149            return;
150          }
151        }
152      } catch (Exception e) {
153        SimpleRpcServer.LOG
154            .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e);
155      }
156    }
157    SimpleRpcServer.LOG.info(getName() + ": stopped");
158  }
159
160  /**
161   * If there were some calls that have not been sent out for a long time, we close the connection.
162   * @return the time of the purge.
163   */
164  private long purge(long lastPurgeTime) {
165    long now = System.currentTimeMillis();
166    if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) {
167      return lastPurgeTime;
168    }
169
170    ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>();
171    // get the list of channels from list of keys.
172    synchronized (writeSelector.keys()) {
173      for (SelectionKey key : writeSelector.keys()) {
174        SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
175        if (connection == null) {
176          throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
177        }
178        if (connection.lastSentTime > 0 &&
179            now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout) {
180          conWithOldCalls.add(connection);
181        }
182      }
183    }
184
185    // Seems safer to close the connection outside of the synchronized loop...
186    for (SimpleServerRpcConnection connection : conWithOldCalls) {
187      this.simpleRpcServer.closeConnection(connection);
188    }
189
190    return now;
191  }
192
193  private void doAsyncWrite(SelectionKey key) throws IOException {
194    SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
195    if (connection == null) {
196      throw new IOException("doAsyncWrite: no connection");
197    }
198    if (key.channel() != connection.channel) {
199      throw new IOException("doAsyncWrite: bad channel");
200    }
201
202    if (processAllResponses(connection)) {
203      try {
204        // We wrote everything, so we don't need to be told when the socket is ready for
205        // write anymore.
206        key.interestOps(0);
207      } catch (CancelledKeyException e) {
208        /*
209         * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so
210         * not sure if this will ever fire. This warning could be removed.
211         */
212        SimpleRpcServer.LOG.warn("Exception while changing ops : " + e);
213      }
214    }
215  }
216
217  /**
218   * Process the response for this call. You need to have the lock on
219   * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock}
220   * @return true if we proceed the call fully, false otherwise.
221   * @throws IOException
222   */
223  private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp)
224      throws IOException {
225    boolean error = true;
226    BufferChain buf = resp.getResponse();
227    try {
228      // Send as much data as we can in the non-blocking fashion
229      long numBytes =
230          this.simpleRpcServer.channelWrite(conn.channel, buf);
231      if (numBytes < 0) {
232        throw new HBaseIOException("Error writing on the socket " + conn);
233      }
234      error = false;
235    } finally {
236      if (error) {
237        SimpleRpcServer.LOG.debug(conn + ": output error -- closing");
238        // We will be closing this connection itself. Mark this call as done so that all the
239        // buffer(s) it got from pool can get released
240        resp.done();
241        this.simpleRpcServer.closeConnection(conn);
242      }
243    }
244
245    if (!buf.hasRemaining()) {
246      resp.done();
247      return true;
248    } else {
249      // set the serve time when the response has to be sent later
250      conn.lastSentTime = System.currentTimeMillis();
251      return false; // Socket can't take more, we will have to come back.
252    }
253  }
254
255  /**
256   * Process all the responses for this connection
257   * @return true if all the calls were processed or that someone else is doing it. false if there *
258   *         is still some work to do. In this case, we expect the caller to delay us.
259   * @throws IOException
260   */
261  private boolean processAllResponses(final SimpleServerRpcConnection connection)
262      throws IOException {
263    // We want only one writer on the channel for a connection at a time.
264    connection.responseWriteLock.lock();
265    try {
266      for (int i = 0; i < 20; i++) {
267        // protection if some handlers manage to need all the responder
268        RpcResponse resp = connection.responseQueue.pollFirst();
269        if (resp == null) {
270          return true;
271        }
272        if (!processResponse(connection, resp)) {
273          connection.responseQueue.addFirst(resp);
274          return false;
275        }
276      }
277    } finally {
278      connection.responseWriteLock.unlock();
279    }
280
281    return connection.responseQueue.isEmpty();
282  }
283
284  //
285  // Enqueue a response from the application.
286  //
287  void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException {
288    boolean added = false;
289    // If there is already a write in progress, we don't wait. This allows to free the handlers
290    // immediately for other tasks.
291    if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) {
292      try {
293        if (conn.responseQueue.isEmpty()) {
294          // If we're alone, we can try to do a direct call to the socket. It's
295          // an optimization to save on context switches and data transfer between cores..
296          if (processResponse(conn, resp)) {
297            return; // we're done.
298          }
299          // Too big to fit, putting ahead.
300          conn.responseQueue.addFirst(resp);
301          added = true; // We will register to the selector later, outside of the lock.
302        }
303      } finally {
304        conn.responseWriteLock.unlock();
305      }
306    }
307
308    if (!added) {
309      conn.responseQueue.addLast(resp);
310    }
311    registerForWrite(conn);
312  }
313}