001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.nio.channels.CancelledKeyException;
022import java.nio.channels.ClosedChannelException;
023import java.nio.channels.SelectionKey;
024import java.nio.channels.Selector;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Iterator;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import org.apache.hadoop.hbase.HBaseIOException;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.Threads;
033import org.apache.hadoop.util.StringUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 * Sends responses of RPC back to clients.
038 */
039@Deprecated
040@InterfaceAudience.Private
041class SimpleRpcServerResponder extends Thread {
042
043  private final SimpleRpcServer simpleRpcServer;
044  private final Selector writeSelector;
045  private final Set<SimpleServerRpcConnection> writingCons =
046    Collections.newSetFromMap(new ConcurrentHashMap<>());
047
048  SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException {
049    this.simpleRpcServer = simpleRpcServer;
050    this.setName("RpcServer.responder");
051    this.setDaemon(true);
052    this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
053    writeSelector = Selector.open(); // create a selector
054  }
055
056  @Override
057  public void run() {
058    SimpleRpcServer.LOG.debug(getName() + ": starting");
059    try {
060      doRunLoop();
061    } finally {
062      SimpleRpcServer.LOG.info(getName() + ": stopping");
063      try {
064        writeSelector.close();
065      } catch (IOException ioe) {
066        SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe);
067      }
068    }
069  }
070
071  /**
072   * Take the list of the connections that want to write, and register them in the selector.
073   */
074  private void registerWrites() {
075    Iterator<SimpleServerRpcConnection> it = writingCons.iterator();
076    while (it.hasNext()) {
077      SimpleServerRpcConnection c = it.next();
078      it.remove();
079      SelectionKey sk = c.channel.keyFor(writeSelector);
080      try {
081        if (sk == null) {
082          try {
083            c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
084          } catch (ClosedChannelException e) {
085            // ignore: the client went away.
086            if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
087          }
088        } else {
089          sk.interestOps(SelectionKey.OP_WRITE);
090        }
091      } catch (CancelledKeyException e) {
092        // ignore: the client went away.
093        if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
094      }
095    }
096  }
097
098  /**
099   * Add a connection to the list that want to write,
100   */
101  public void registerForWrite(SimpleServerRpcConnection c) {
102    if (writingCons.add(c)) {
103      writeSelector.wakeup();
104    }
105  }
106
107  private void doRunLoop() {
108    long lastPurgeTime = 0; // last check for old calls.
109    while (this.simpleRpcServer.running) {
110      try {
111        registerWrites();
112        int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout);
113        if (keyCt == 0) {
114          continue;
115        }
116
117        Set<SelectionKey> keys = writeSelector.selectedKeys();
118        Iterator<SelectionKey> iter = keys.iterator();
119        while (iter.hasNext()) {
120          SelectionKey key = iter.next();
121          iter.remove();
122          try {
123            if (key.isValid() && key.isWritable()) {
124              doAsyncWrite(key);
125            }
126          } catch (IOException e) {
127            SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e);
128          }
129        }
130
131        lastPurgeTime = purge(lastPurgeTime);
132
133      } catch (OutOfMemoryError e) {
134        if (this.simpleRpcServer.errorHandler != null) {
135          if (this.simpleRpcServer.errorHandler.checkOOME(e)) {
136            SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError");
137            return;
138          }
139        } else {
140          //
141          // we can run out of memory if we have too many threads
142          // log the event and sleep for a minute and give
143          // some thread(s) a chance to finish
144          //
145          SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e);
146          try {
147            Thread.sleep(60000);
148          } catch (InterruptedException ex) {
149            SimpleRpcServer.LOG.debug("Interrupted while sleeping");
150            return;
151          }
152        }
153      } catch (Exception e) {
154        SimpleRpcServer.LOG
155          .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e);
156      }
157    }
158    SimpleRpcServer.LOG.info(getName() + ": stopped");
159  }
160
161  /**
162   * If there were some calls that have not been sent out for a long time, we close the connection.
163   * @return the time of the purge.
164   */
165  private long purge(long lastPurgeTime) {
166    long now = EnvironmentEdgeManager.currentTime();
167    if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) {
168      return lastPurgeTime;
169    }
170
171    ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>();
172    // get the list of channels from list of keys.
173    synchronized (writeSelector.keys()) {
174      for (SelectionKey key : writeSelector.keys()) {
175        SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
176        if (connection == null) {
177          throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
178        }
179        if (
180          connection.lastSentTime > 0
181            && now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout
182        ) {
183          conWithOldCalls.add(connection);
184        }
185      }
186    }
187
188    // Seems safer to close the connection outside of the synchronized loop...
189    for (SimpleServerRpcConnection connection : conWithOldCalls) {
190      this.simpleRpcServer.closeConnection(connection);
191    }
192
193    return now;
194  }
195
196  private void doAsyncWrite(SelectionKey key) throws IOException {
197    SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
198    if (connection == null) {
199      throw new IOException("doAsyncWrite: no connection");
200    }
201    if (key.channel() != connection.channel) {
202      throw new IOException("doAsyncWrite: bad channel");
203    }
204
205    if (processAllResponses(connection)) {
206      try {
207        // We wrote everything, so we don't need to be told when the socket is ready for
208        // write anymore.
209        key.interestOps(0);
210      } catch (CancelledKeyException e) {
211        /*
212         * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so
213         * not sure if this will ever fire. This warning could be removed.
214         */
215        SimpleRpcServer.LOG.warn("Exception while changing ops : " + e);
216      }
217    }
218  }
219
220  /**
221   * Process the response for this call. You need to have the lock on
222   * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock}
223   * @return true if we proceed the call fully, false otherwise. n
224   */
225  private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp)
226    throws IOException {
227    boolean error = true;
228    BufferChain buf = resp.getResponse();
229    try {
230      // Send as much data as we can in the non-blocking fashion
231      long numBytes = this.simpleRpcServer.channelWrite(conn.channel, buf);
232      if (numBytes < 0) {
233        throw new HBaseIOException("Error writing on the socket " + conn);
234      }
235      error = false;
236    } finally {
237      if (error) {
238        SimpleRpcServer.LOG.debug(conn + ": output error -- closing");
239        // We will be closing this connection itself. Mark this call as done so that all the
240        // buffer(s) it got from pool can get released
241        resp.done();
242        this.simpleRpcServer.closeConnection(conn);
243      }
244    }
245
246    if (!buf.hasRemaining()) {
247      resp.done();
248      return true;
249    } else {
250      // set the serve time when the response has to be sent later
251      conn.lastSentTime = EnvironmentEdgeManager.currentTime();
252      return false; // Socket can't take more, we will have to come back.
253    }
254  }
255
256  /**
257   * Process all the responses for this connection
258   * @return true if all the calls were processed or that someone else is doing it. false if there *
259   *         is still some work to do. In this case, we expect the caller to delay us. n
260   */
261  private boolean processAllResponses(final SimpleServerRpcConnection connection)
262    throws IOException {
263    // We want only one writer on the channel for a connection at a time.
264    connection.responseWriteLock.lock();
265    try {
266      for (int i = 0; i < 20; i++) {
267        // protection if some handlers manage to need all the responder
268        RpcResponse resp = connection.responseQueue.pollFirst();
269        if (resp == null) {
270          return true;
271        }
272        if (!processResponse(connection, resp)) {
273          connection.responseQueue.addFirst(resp);
274          return false;
275        }
276      }
277    } finally {
278      connection.responseWriteLock.unlock();
279    }
280
281    return connection.responseQueue.isEmpty();
282  }
283
284  //
285  // Enqueue a response from the application.
286  //
287  void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException {
288    boolean added = false;
289    // If there is already a write in progress, we don't wait. This allows to free the handlers
290    // immediately for other tasks.
291    if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) {
292      try {
293        if (conn.responseQueue.isEmpty()) {
294          // If we're alone, we can try to do a direct call to the socket. It's
295          // an optimization to save on context switches and data transfer between cores..
296          if (processResponse(conn, resp)) {
297            return; // we're done.
298          }
299          // Too big to fit, putting ahead.
300          conn.responseQueue.addFirst(resp);
301          added = true; // We will register to the selector later, outside of the lock.
302        }
303      } finally {
304        conn.responseWriteLock.unlock();
305      }
306    }
307
308    if (!added) {
309      conn.responseQueue.addLast(resp);
310    }
311    registerForWrite(conn);
312  }
313}