001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.nio.channels.CancelledKeyException; 022import java.nio.channels.ClosedChannelException; 023import java.nio.channels.SelectionKey; 024import java.nio.channels.Selector; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.Iterator; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030 031import org.apache.hadoop.hbase.HBaseIOException; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.apache.hadoop.hbase.util.Threads; 034import org.apache.hadoop.util.StringUtils; 035 036/** 037 * Sends responses of RPC back to clients. 038 */ 039@InterfaceAudience.Private 040class SimpleRpcServerResponder extends Thread { 041 042 private final SimpleRpcServer simpleRpcServer; 043 private final Selector writeSelector; 044 private final Set<SimpleServerRpcConnection> writingCons = 045 Collections.newSetFromMap(new ConcurrentHashMap<>()); 046 047 SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException { 048 this.simpleRpcServer = simpleRpcServer; 049 this.setName("RpcServer.responder"); 050 this.setDaemon(true); 051 this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); 052 writeSelector = Selector.open(); // create a selector 053 } 054 055 @Override 056 public void run() { 057 SimpleRpcServer.LOG.debug(getName() + ": starting"); 058 try { 059 doRunLoop(); 060 } finally { 061 SimpleRpcServer.LOG.info(getName() + ": stopping"); 062 try { 063 writeSelector.close(); 064 } catch (IOException ioe) { 065 SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe); 066 } 067 } 068 } 069 070 /** 071 * Take the list of the connections that want to write, and register them in the selector. 072 */ 073 private void registerWrites() { 074 Iterator<SimpleServerRpcConnection> it = writingCons.iterator(); 075 while (it.hasNext()) { 076 SimpleServerRpcConnection c = it.next(); 077 it.remove(); 078 SelectionKey sk = c.channel.keyFor(writeSelector); 079 try { 080 if (sk == null) { 081 try { 082 c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); 083 } catch (ClosedChannelException e) { 084 // ignore: the client went away. 085 if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); 086 } 087 } else { 088 sk.interestOps(SelectionKey.OP_WRITE); 089 } 090 } catch (CancelledKeyException e) { 091 // ignore: the client went away. 092 if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); 093 } 094 } 095 } 096 097 /** 098 * Add a connection to the list that want to write, 099 */ 100 public void registerForWrite(SimpleServerRpcConnection c) { 101 if (writingCons.add(c)) { 102 writeSelector.wakeup(); 103 } 104 } 105 106 private void doRunLoop() { 107 long lastPurgeTime = 0; // last check for old calls. 108 while (this.simpleRpcServer.running) { 109 try { 110 registerWrites(); 111 int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout); 112 if (keyCt == 0) { 113 continue; 114 } 115 116 Set<SelectionKey> keys = writeSelector.selectedKeys(); 117 Iterator<SelectionKey> iter = keys.iterator(); 118 while (iter.hasNext()) { 119 SelectionKey key = iter.next(); 120 iter.remove(); 121 try { 122 if (key.isValid() && key.isWritable()) { 123 doAsyncWrite(key); 124 } 125 } catch (IOException e) { 126 SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e); 127 } 128 } 129 130 lastPurgeTime = purge(lastPurgeTime); 131 132 } catch (OutOfMemoryError e) { 133 if (this.simpleRpcServer.errorHandler != null) { 134 if (this.simpleRpcServer.errorHandler.checkOOME(e)) { 135 SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError"); 136 return; 137 } 138 } else { 139 // 140 // we can run out of memory if we have too many threads 141 // log the event and sleep for a minute and give 142 // some thread(s) a chance to finish 143 // 144 SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e); 145 try { 146 Thread.sleep(60000); 147 } catch (InterruptedException ex) { 148 SimpleRpcServer.LOG.debug("Interrupted while sleeping"); 149 return; 150 } 151 } 152 } catch (Exception e) { 153 SimpleRpcServer.LOG 154 .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e); 155 } 156 } 157 SimpleRpcServer.LOG.info(getName() + ": stopped"); 158 } 159 160 /** 161 * If there were some calls that have not been sent out for a long time, we close the connection. 162 * @return the time of the purge. 163 */ 164 private long purge(long lastPurgeTime) { 165 long now = System.currentTimeMillis(); 166 if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) { 167 return lastPurgeTime; 168 } 169 170 ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>(); 171 // get the list of channels from list of keys. 172 synchronized (writeSelector.keys()) { 173 for (SelectionKey key : writeSelector.keys()) { 174 SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); 175 if (connection == null) { 176 throw new IllegalStateException("Coding error: SelectionKey key without attachment."); 177 } 178 if (connection.lastSentTime > 0 && 179 now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout) { 180 conWithOldCalls.add(connection); 181 } 182 } 183 } 184 185 // Seems safer to close the connection outside of the synchronized loop... 186 for (SimpleServerRpcConnection connection : conWithOldCalls) { 187 this.simpleRpcServer.closeConnection(connection); 188 } 189 190 return now; 191 } 192 193 private void doAsyncWrite(SelectionKey key) throws IOException { 194 SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); 195 if (connection == null) { 196 throw new IOException("doAsyncWrite: no connection"); 197 } 198 if (key.channel() != connection.channel) { 199 throw new IOException("doAsyncWrite: bad channel"); 200 } 201 202 if (processAllResponses(connection)) { 203 try { 204 // We wrote everything, so we don't need to be told when the socket is ready for 205 // write anymore. 206 key.interestOps(0); 207 } catch (CancelledKeyException e) { 208 /* 209 * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so 210 * not sure if this will ever fire. This warning could be removed. 211 */ 212 SimpleRpcServer.LOG.warn("Exception while changing ops : " + e); 213 } 214 } 215 } 216 217 /** 218 * Process the response for this call. You need to have the lock on 219 * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock} 220 * @return true if we proceed the call fully, false otherwise. 221 * @throws IOException 222 */ 223 private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp) 224 throws IOException { 225 boolean error = true; 226 BufferChain buf = resp.getResponse(); 227 try { 228 // Send as much data as we can in the non-blocking fashion 229 long numBytes = 230 this.simpleRpcServer.channelWrite(conn.channel, buf); 231 if (numBytes < 0) { 232 throw new HBaseIOException("Error writing on the socket " + conn); 233 } 234 error = false; 235 } finally { 236 if (error) { 237 SimpleRpcServer.LOG.debug(conn + ": output error -- closing"); 238 // We will be closing this connection itself. Mark this call as done so that all the 239 // buffer(s) it got from pool can get released 240 resp.done(); 241 this.simpleRpcServer.closeConnection(conn); 242 } 243 } 244 245 if (!buf.hasRemaining()) { 246 resp.done(); 247 return true; 248 } else { 249 // set the serve time when the response has to be sent later 250 conn.lastSentTime = System.currentTimeMillis(); 251 return false; // Socket can't take more, we will have to come back. 252 } 253 } 254 255 /** 256 * Process all the responses for this connection 257 * @return true if all the calls were processed or that someone else is doing it. false if there * 258 * is still some work to do. In this case, we expect the caller to delay us. 259 * @throws IOException 260 */ 261 private boolean processAllResponses(final SimpleServerRpcConnection connection) 262 throws IOException { 263 // We want only one writer on the channel for a connection at a time. 264 connection.responseWriteLock.lock(); 265 try { 266 for (int i = 0; i < 20; i++) { 267 // protection if some handlers manage to need all the responder 268 RpcResponse resp = connection.responseQueue.pollFirst(); 269 if (resp == null) { 270 return true; 271 } 272 if (!processResponse(connection, resp)) { 273 connection.responseQueue.addFirst(resp); 274 return false; 275 } 276 } 277 } finally { 278 connection.responseWriteLock.unlock(); 279 } 280 281 return connection.responseQueue.isEmpty(); 282 } 283 284 // 285 // Enqueue a response from the application. 286 // 287 void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException { 288 boolean added = false; 289 // If there is already a write in progress, we don't wait. This allows to free the handlers 290 // immediately for other tasks. 291 if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) { 292 try { 293 if (conn.responseQueue.isEmpty()) { 294 // If we're alone, we can try to do a direct call to the socket. It's 295 // an optimization to save on context switches and data transfer between cores.. 296 if (processResponse(conn, resp)) { 297 return; // we're done. 298 } 299 // Too big to fit, putting ahead. 300 conn.responseQueue.addFirst(resp); 301 added = true; // We will register to the selector later, outside of the lock. 302 } 303 } finally { 304 conn.responseWriteLock.unlock(); 305 } 306 } 307 308 if (!added) { 309 conn.responseQueue.addLast(resp); 310 } 311 registerForWrite(conn); 312 } 313}