View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.ipc;
22  
23  import java.io.BufferedInputStream;
24  import java.io.BufferedOutputStream;
25  import java.io.DataInputStream;
26  import java.io.DataOutputStream;
27  import java.io.FilterInputStream;
28  import java.io.IOException;
29  import java.io.InputStream;
30  import java.net.ConnectException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketException;
34  import java.net.SocketTimeoutException;
35  import java.net.UnknownHostException;
36  import java.util.Iterator;
37  import java.util.LinkedList;
38  import java.util.Map.Entry;
39  import java.util.concurrent.ConcurrentSkipListMap;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicLong;
42  
43  import javax.net.SocketFactory;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.security.User;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  import org.apache.hadoop.hbase.util.Pair;
53  import org.apache.hadoop.hbase.util.PoolMap;
54  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
55  import org.apache.hadoop.io.DataOutputBuffer;
56  import org.apache.hadoop.io.IOUtils;
57  import org.apache.hadoop.io.Writable;
58  import org.apache.hadoop.io.WritableUtils;
59  import org.apache.hadoop.ipc.RemoteException;
60  import org.apache.hadoop.net.NetUtils;
61  import org.apache.hadoop.util.ReflectionUtils;
62  
63  /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
64   * parameter, and return a {@link Writable} as their value.  A service runs on
65   * a port and is defined by a parameter class and a value class.
66   *
67   * <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
68   * moved into this package so can access package-private methods.
69   *
70   * @see HBaseServer
71   */
72  public class HBaseClient {
73  
74    private static final Log LOG = LogFactory
75        .getLog("org.apache.hadoop.ipc.HBaseClient");
76    protected final PoolMap<ConnectionId, Connection> connections;
77  
78    protected final Class<? extends Writable> valueClass;   // class of call values
79    protected int counter;                            // counter for call ids
80    protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
81    final protected Configuration conf;
82    final protected int maxIdleTime; // connections will be culled if it was idle for
83                             // maxIdleTime microsecs
84    final protected int maxRetries; //the max. no. of retries for socket connections
85    final protected long failureSleep; // Time to sleep before retry on failure.
86    protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
87    protected final boolean tcpKeepAlive; // if T then use keepalives
88    protected int pingInterval; // how often sends ping to the server in msecs
89    protected int socketTimeout; // socket timeout
90    protected final InetSocketAddress bindAddress; // address to bind to the client socket
91    protected FailedServers failedServers;
92  
93    protected final SocketFactory socketFactory;           // how to create sockets
94    protected String clusterId;
95  
96    final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
97    final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
98    final static int DEFAULT_PING_INTERVAL = 60000;  // 1 min
99    final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
100   final static int PING_CALL_ID = -1;
101 
102   public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
103   public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
104 
105   /**
106    * A class to manage a list of servers that failed recently.
107    */
108   static class FailedServers {
109     private final LinkedList<Pair<Long, String>> failedServers = new
110         LinkedList<Pair<Long, String>>();
111     private final int recheckServersTimeout;
112 
113     FailedServers(Configuration conf) {
114       this.recheckServersTimeout = conf.getInt(
115           FAILED_SERVER_EXPIRY_KEY, FAILED_SERVER_EXPIRY_DEFAULT);
116     }
117 
118     /**
119      * Add an address to the list of the failed servers list.
120      */
121     public synchronized void addToFailedServers(InetSocketAddress address) {
122       final long expiry = EnvironmentEdgeManager.currentTimeMillis() + recheckServersTimeout;
123       failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
124     }
125 
126     /**
127      * Check if the server should be considered as bad. Clean the old entries of the list.
128      *
129      * @return true if the server is in the failed servers list
130      */
131     public synchronized boolean isFailedServer(final InetSocketAddress address) {
132       if (failedServers.isEmpty()) {
133         return false;
134       }
135 
136       final String lookup = address.toString();
137       final long now = EnvironmentEdgeManager.currentTimeMillis();
138 
139       // iterate, looking for the search entry and cleaning expired entries
140       Iterator<Pair<Long, String>> it = failedServers.iterator();
141       while (it.hasNext()) {
142         Pair<Long, String> cur = it.next();
143         if (cur.getFirst() < now) {
144           it.remove();
145         } else {
146           if (lookup.equals(cur.getSecond())) {
147             return true;
148           }
149         }
150       }
151 
152       return false;
153     }
154 
155   }
156 
157   public static class FailedServerException extends IOException {
158     public FailedServerException(String s) {
159       super(s);
160     }
161   }
162 
163 
164   /**
165    * set the ping interval value in configuration
166    *
167    * @param conf Configuration
168    * @param pingInterval the ping interval
169    */
170   public static void setPingInterval(Configuration conf, int pingInterval) {
171     conf.setInt(PING_INTERVAL_NAME, pingInterval);
172   }
173 
174   /**
175    * Get the ping interval from configuration;
176    * If not set in the configuration, return the default value.
177    *
178    * @param conf Configuration
179    * @return the ping interval
180    */
181   static int getPingInterval(Configuration conf) {
182     return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
183   }
184 
185   /**
186    * Set the socket timeout
187    * @param conf Configuration
188    * @param socketTimeout the socket timeout
189    */
190   public static void setSocketTimeout(Configuration conf, int socketTimeout) {
191     conf.setInt(SOCKET_TIMEOUT, socketTimeout);
192   }
193 
194   /**
195    * @return the socket timeout
196    */
197   static int getSocketTimeout(Configuration conf) {
198     return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
199   }
200 
201   /** A call waiting for a value. */
202   protected class Call {
203     final int id;                                       // call id
204     final Writable param;                               // parameter
205     Writable value;                               // value, null if error
206     IOException error;                            // exception, null if value
207     boolean done;                                 // true when call is done
208     long startTime;
209 
210     protected Call(Writable param) {
211       this.param = param;
212       this.startTime = System.currentTimeMillis();
213       synchronized (HBaseClient.this) {
214         this.id = counter++;
215       }
216     }
217 
218     /** Indicate when the call is complete and the
219      * value or error are available.  Notifies by default.  */
220     protected synchronized void callComplete() {
221       this.done = true;
222       notify();                                 // notify caller
223     }
224 
225     /** Set the exception when there is an error.
226      * Notify the caller the call is done.
227      *
228      * @param error exception thrown by the call; either local or remote
229      */
230     public synchronized void setException(IOException error) {
231       this.error = error;
232       callComplete();
233     }
234 
235     /** Set the return value when there is no error.
236      * Notify the caller the call is done.
237      *
238      * @param value return value of the call.
239      */
240     public synchronized void setValue(Writable value) {
241       this.value = value;
242       callComplete();
243     }
244 
245     public long getStartTime() {
246       return this.startTime;
247     }
248   }
249 
250   /**
251    * Creates a connection. Can be overridden by a subclass for testing.
252    *
253    * @param remoteId - the ConnectionId to use for the connection creation.
254    */
255   protected Connection createConnection(ConnectionId remoteId) throws IOException {
256     return new Connection(remoteId);
257   }
258 
259   /** Thread that reads responses and notifies callers.  Each connection owns a
260    * socket connected to a remote address.  Calls are multiplexed through this
261    * socket: responses may be delivered out of order. */
262   protected class Connection extends Thread {
263     private ConnectionHeader header;              // connection header
264     protected ConnectionId remoteId;
265     protected Socket socket = null;                 // connected socket
266     protected DataInputStream in;
267     protected DataOutputStream out;
268 
269     // currently active calls
270     protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
271     protected final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
272     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
273     protected IOException closeException; // close reason
274 
275     Connection(ConnectionId remoteId) throws IOException {
276       if (remoteId.getAddress().isUnresolved()) {
277         throw new UnknownHostException("unknown host: " +
278                                        remoteId.getAddress().getHostName());
279       }
280       this.remoteId = remoteId;
281       User ticket = remoteId.getTicket();
282       Class<? extends VersionedProtocol> protocol = remoteId.getProtocol();
283 
284       header = new ConnectionHeader(
285           protocol == null ? null : protocol.getName(), ticket);
286 
287       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
288         remoteId.getAddress().toString() +
289         ((ticket==null)?" from an unknown user": (" from " + ticket.getName())));
290       this.setDaemon(true);
291     }
292 
293     /** Update lastActivity with the current time. */
294     protected void touch() {
295       lastActivity.set(System.currentTimeMillis());
296     }
297 
298     /**
299      * Add a call to this connection's call queue and notify
300      * a listener; synchronized. If the connection is dead, the call is not added, and the
301      * caller is notified.
302      * This function can return a connection that is already marked as 'shouldCloseConnection'
303      *  It is up to the user code to check this status.
304      * @param call to add
305      */
306     protected synchronized void addCall(Call call) {
307       // If the connection is about to close, we manage this as if the call was already added
308       //  to the connection calls list. If not, the connection creations are serialized, as
309       //  mentioned in HBASE-6364
310       if (this.shouldCloseConnection.get()) {
311         if (this.closeException == null) {
312           call.setException(new IOException(
313               "Call " + call.id + " not added as the connection " + remoteId + " is closing"));
314         } else {
315           call.setException(this.closeException);
316         }
317         synchronized (call) {
318           call.notifyAll();
319         }
320       } else {
321         calls.put(call.id, call);
322         notify();
323       }
324     }
325 
326     /** This class sends a ping to the remote side when timeout on
327      * reading. If no failure is detected, it retries until at least
328      * a byte is read.
329      */
330     protected class PingInputStream extends FilterInputStream {
331       /* constructor */
332       protected PingInputStream(InputStream in) {
333         super(in);
334       }
335 
336       /* Process timeout exception
337        * if the connection is not going to be closed, send a ping.
338        * otherwise, throw the timeout exception.
339        */
340       private void handleTimeout(SocketTimeoutException e) throws IOException {
341         if (shouldCloseConnection.get() || !running.get() ||
342             remoteId.rpcTimeout > 0) {
343           throw e;
344         }
345         sendPing();
346       }
347 
348       /** Read a byte from the stream.
349        * Send a ping if timeout on read. Retries if no failure is detected
350        * until a byte is read.
351        * @throws IOException for any IO problem other than socket timeout
352        */
353       @Override
354       public int read() throws IOException {
355         do {
356           try {
357             return super.read();
358           } catch (SocketTimeoutException e) {
359             handleTimeout(e);
360           }
361         } while (true);
362       }
363 
364       /** Read bytes into a buffer starting from offset <code>off</code>
365        * Send a ping if timeout on read. Retries if no failure is detected
366        * until a byte is read.
367        *
368        * @return the total number of bytes read; -1 if the connection is closed.
369        */
370       @Override
371       public int read(byte[] buf, int off, int len) throws IOException {
372         do {
373           try {
374             return super.read(buf, off, len);
375           } catch (SocketTimeoutException e) {
376             handleTimeout(e);
377           }
378         } while (true);
379       }
380     }
381 
382     protected synchronized void setupConnection() throws IOException {
383       short ioFailures = 0;
384       short timeoutFailures = 0;
385       while (true) {
386         try {
387           this.socket = socketFactory.createSocket();
388           this.socket.setTcpNoDelay(tcpNoDelay);
389           this.socket.setKeepAlive(tcpKeepAlive);
390           if (bindAddress != null) this.socket.bind(bindAddress);
391           // connection time out is 20s
392           NetUtils.connect(this.socket, remoteId.getAddress(),
393               getSocketTimeout(conf));
394           if (remoteId.rpcTimeout > 0) {
395             pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
396           }
397           this.socket.setSoTimeout(pingInterval);
398           return;
399         } catch (SocketTimeoutException toe) {
400           /* The max number of retries is 45,
401            * which amounts to 20s*45 = 15 minutes retries.
402            */
403           handleConnectionFailure(timeoutFailures++, maxRetries, toe);
404         } catch (IOException ie) {
405           handleConnectionFailure(ioFailures++, maxRetries, ie);
406         }
407       }
408     }
409 
410     /** Connect to the server and set up the I/O streams. It then sends
411      * a header to the server and starts
412      * the connection thread that waits for responses.
413      * @throws java.io.IOException e
414      */
415     protected synchronized void setupIOstreams()
416         throws IOException, InterruptedException {
417 
418       if (socket != null || shouldCloseConnection.get()) {
419         return;
420       }
421 
422       if (failedServers.isFailedServer(remoteId.getAddress())) {
423         if (LOG.isDebugEnabled()) {
424           LOG.debug("Not trying to connect to " + remoteId.getAddress() +
425               " this server is in the failed servers list");
426         }
427         IOException e = new FailedServerException(
428             "This server is in the failed servers list: " + remoteId.getAddress());
429         markClosed(e);
430         close();
431         throw e;
432       }
433 
434       try {
435         if (LOG.isDebugEnabled()) {
436           LOG.debug("Connecting to "+remoteId);
437         }
438         setupConnection();
439         this.in = new DataInputStream(new BufferedInputStream
440             (new PingInputStream(NetUtils.getInputStream(socket))));
441         this.out = new DataOutputStream(new BufferedOutputStream(
442             NetUtils.getOutputStream(socket, pingInterval)));
443         writeHeader();
444 
445         // update last activity time
446         touch();
447 
448         // start the receiver thread after the socket connection has been set up
449         start();
450       } catch (Throwable t) {
451         failedServers.addToFailedServers(remoteId.address);
452         IOException e;
453         if (t instanceof IOException) {
454           e = (IOException)t;
455         } else {
456           e = new IOException("Could not set up IO Streams", t);
457         }
458         markClosed(e);
459         close();
460 
461         throw e;
462       }
463     }
464 
465     protected void closeConnection() {
466       // close the current connection
467       if (socket != null) {
468         try {
469           socket.close();
470         } catch (IOException e) {
471           LOG.warn("Not able to close a socket", e);
472         }
473       }
474       // set socket to null so that the next call to setupIOstreams
475       // can start the process of connect all over again.
476       socket = null;
477     }
478 
479     /**
480      *  Handle connection failures
481      *
482      * If the current number of retries is equal to the max number of retries,
483      * stop retrying and throw the exception; Otherwise backoff N seconds and
484      * try connecting again.
485      *
486      * This Method is only called from inside setupIOstreams(), which is
487      * synchronized. Hence the sleep is synchronized; the locks will be retained.
488      *
489      * @param curRetries current number of retries
490      * @param maxRetries max number of retries allowed
491      * @param ioe failure reason
492      * @throws IOException if max number of retries is reached
493      */
494     private void handleConnectionFailure(
495         int curRetries, int maxRetries, IOException ioe) throws IOException {
496 
497       closeConnection();
498 
499       // throw the exception if the maximum number of retries is reached
500       if (curRetries >= maxRetries) {
501         throw ioe;
502       }
503 
504       // otherwise back off and retry
505       try {
506         Thread.sleep(failureSleep);
507       } catch (InterruptedException ignored) {}
508 
509       LOG.info("Retrying connect to server: " + remoteId.getAddress() +
510         " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
511         " time(s).");
512     }
513 
514     /* Write the header for each connection
515      * Out is not synchronized because only the first thread does this.
516      */
517     private void writeHeader() throws IOException {
518       out.write(HBaseServer.HEADER.array());
519       out.write(HBaseServer.CURRENT_VERSION);
520       //When there are more fields we can have ConnectionHeader Writable.
521       DataOutputBuffer buf = new DataOutputBuffer();
522       header.write(buf);
523 
524       int bufLen = buf.getLength();
525       out.writeInt(bufLen);
526       out.write(buf.getData(), 0, bufLen);
527     }
528 
529     /* wait till someone signals us to start reading RPC response or
530      * it is idle too long, it is marked as to be closed,
531      * or the client is marked as not running.
532      *
533      * Return true if it is time to read a response; false otherwise.
534      */
535     @SuppressWarnings({"ThrowableInstanceNeverThrown"})
536     protected synchronized boolean waitForWork() {
537       if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
538         long timeout = maxIdleTime-
539               (System.currentTimeMillis()-lastActivity.get());
540         if (timeout>0) {
541           try {
542             wait(timeout);
543           } catch (InterruptedException ignored) {}
544         }
545       }
546 
547       if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
548         return true;
549       } else if (shouldCloseConnection.get()) {
550         return false;
551       } else if (calls.isEmpty()) { // idle connection closed or stopped
552         markClosed(null);
553         return false;
554       } else { // get stopped but there are still pending requests
555         markClosed((IOException)new IOException().initCause(
556             new InterruptedException()));
557         return false;
558       }
559     }
560 
561     public InetSocketAddress getRemoteAddress() {
562       return remoteId.getAddress();
563     }
564 
565     /* Send a ping to the server if the time elapsed
566      * since last I/O activity is equal to or greater than the ping interval
567      */
568     protected synchronized void sendPing() throws IOException {
569       long curTime = System.currentTimeMillis();
570       if ( curTime - lastActivity.get() >= pingInterval) {
571         lastActivity.set(curTime);
572         //noinspection SynchronizeOnNonFinalField
573         synchronized (this.out) {
574           out.writeInt(PING_CALL_ID);
575           out.flush();
576         }
577       }
578     }
579 
580     @Override
581     public void run() {
582       if (LOG.isDebugEnabled())
583         LOG.debug(getName() + ": starting, having connections "
584             + connections.size());
585 
586       try {
587         while (waitForWork()) {//wait here for work - read or close connection
588           receiveResponse();
589         }
590       } catch (Throwable t) {
591         LOG.warn("Unexpected exception receiving call responses", t);
592         markClosed(new IOException("Unexpected exception receiving call responses", t));
593       }
594 
595       close();
596 
597       if (LOG.isDebugEnabled())
598         LOG.debug(getName() + ": stopped, remaining connections "
599             + connections.size());
600     }
601 
602     /* Initiates a call by sending the parameter to the remote server.
603      * Note: this is not called from the Connection thread, but by other
604      * threads.
605      */
606     protected void sendParam(Call call) {
607       if (shouldCloseConnection.get()) {
608         return;
609       }
610 
611       // For serializing the data to be written.
612 
613       final DataOutputBuffer d = new DataOutputBuffer();
614       try {
615         if (LOG.isDebugEnabled())
616           LOG.debug(getName() + " sending #" + call.id);
617 
618         d.writeInt(0xdeadbeef); // placeholder for data length
619         d.writeInt(call.id);
620         call.param.write(d);
621         byte[] data = d.getData();
622         int dataLength = d.getLength();
623         // fill in the placeholder
624         Bytes.putInt(data, 0, dataLength - 4);
625         //noinspection SynchronizeOnNonFinalField
626         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
627           out.write(data, 0, dataLength);
628           out.flush();
629         }
630       } catch(IOException e) {
631         synchronized (this) {
632           if (!shouldCloseConnection.get()) {
633             markClosed(e);
634             interrupt();
635           }
636         }
637       } finally {
638         //the buffer is just an in-memory buffer, but it is still polite to
639         // close early
640         IOUtils.closeStream(d);
641       }
642     }
643 
644     /* Receive a response.
645      * Because only one receiver, so no synchronization on in.
646      */
647     protected void receiveResponse() {
648       if (shouldCloseConnection.get()) {
649         return;
650       }
651       touch();
652 
653       try {
654         // See HBaseServer.Call.setResponse for where we write out the response.
655         // It writes the call.id (int), a flag byte, then optionally the length
656         // of the response (int) followed by data.
657 
658         // Read the call id.
659         int id = in.readInt();
660 
661         if (LOG.isDebugEnabled())
662           LOG.debug(getName() + " got value #" + id);
663         Call call = calls.get(id);
664 
665         // Read the flag byte
666         byte flag = in.readByte();
667         boolean isError = ResponseFlag.isError(flag);
668         if (ResponseFlag.isLength(flag)) {
669           // Currently length if present is unused.
670           in.readInt();
671         }
672         int state = in.readInt(); // Read the state.  Currently unused.
673         if (isError) {
674           if (call != null) {
675             //noinspection ThrowableInstanceNeverThrown
676             call.setException(new RemoteException(WritableUtils.readString(in),
677                 WritableUtils.readString(in)));
678           }
679         } else {
680           Writable value = ReflectionUtils.newInstance(valueClass, conf);
681           value.readFields(in);                 // read value
682           // it's possible that this call may have been cleaned up due to a RPC
683           // timeout, so check if it still exists before setting the value.
684           if (call != null) {
685             call.setValue(value);
686           }
687         }
688         calls.remove(id);
689       } catch (IOException e) {
690         if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
691           // Clean up open calls but don't treat this as a fatal condition,
692           // since we expect certain responses to not make it by the specified
693           // {@link ConnectionId#rpcTimeout}.
694           closeException = e;
695         } else {
696           // Since the server did not respond within the default ping interval
697           // time, treat this as a fatal condition and close this connection
698           markClosed(e);
699         }
700       } finally {
701         if (remoteId.rpcTimeout > 0) {
702           cleanupCalls(remoteId.rpcTimeout);
703         }
704       }
705     }
706 
707     protected synchronized void markClosed(IOException e) {
708       if (shouldCloseConnection.compareAndSet(false, true)) {
709         closeException = e;
710         notifyAll();
711       }
712     }
713 
714     /** Close the connection. */
715     protected synchronized void close() {
716       if (!shouldCloseConnection.get()) {
717         LOG.error("The connection is not in the closed state");
718         return;
719       }
720 
721       // release the resources
722       // first thing to do;take the connection out of the connection list
723       synchronized (connections) {
724         connections.remove(remoteId, this);
725       }
726 
727       // close the streams and therefore the socket
728       IOUtils.closeStream(out);
729       IOUtils.closeStream(in);
730 
731       // clean up all calls
732       if (closeException == null) {
733         if (!calls.isEmpty()) {
734           LOG.warn(
735               "A connection is closed for no cause and calls are not empty");
736 
737           // clean up calls anyway
738           closeException = new IOException("Unexpected closed connection");
739           cleanupCalls();
740         }
741       } else {
742         // log the info
743         if (LOG.isDebugEnabled()) {
744           LOG.debug("closing ipc connection to " + remoteId.address + ": " +
745               closeException.getMessage(),closeException);
746         }
747 
748         // cleanup calls
749         cleanupCalls();
750       }
751       if (LOG.isDebugEnabled())
752         LOG.debug(getName() + ": closed");
753     }
754 
755     /* Cleanup all calls and mark them as done */
756     protected void cleanupCalls() {
757       cleanupCalls(0);
758     }
759 
760     protected void cleanupCalls(long rpcTimeout) {
761       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
762       while (itor.hasNext()) {
763         Call c = itor.next().getValue();
764         long waitTime = System.currentTimeMillis() - c.getStartTime();
765         if (waitTime >= rpcTimeout) {
766           if (this.closeException == null) {
767             // There may be no exception in the case that there are many calls
768             // being multiplexed over this connection and these are succeeding
769             // fine while this Call object is taking a long time to finish
770             // over on the server; e.g. I just asked the regionserver to bulk
771             // open 3k regions or its a big fat multiput into a heavily-loaded
772             // server (Perhaps this only happens at the extremes?)
773             this.closeException = new CallTimeoutException("Call id=" + c.id +
774               ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout);
775           }
776           c.setException(this.closeException);
777           synchronized (c) {
778             c.notifyAll();
779           }
780           itor.remove();
781         } else {
782           break;
783         }
784       }
785       try {
786         if (!calls.isEmpty()) {
787           Call firstCall = calls.get(calls.firstKey());
788           long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
789           if (maxWaitTime < rpcTimeout) {
790             rpcTimeout -= maxWaitTime;
791           }
792         }
793         if (!shouldCloseConnection.get()) {
794           closeException = null;
795           if (socket != null) {
796             socket.setSoTimeout((int) rpcTimeout);
797           }
798         }
799       } catch (SocketException e) {
800         LOG.debug("Couldn't lower timeout, which may result in longer than expected calls");
801       }
802     }
803   }
804 
805   /**
806    * Client-side call timeout
807    */
808   public static class CallTimeoutException extends IOException {
809     public CallTimeoutException(final String msg) {
810       super(msg);
811     }
812   }
813 
814   /** Call implementation used for parallel calls. */
815   protected class ParallelCall extends Call {
816     private final ParallelResults results;
817     protected final int index;
818 
819     public ParallelCall(Writable param, ParallelResults results, int index) {
820       super(param);
821       this.results = results;
822       this.index = index;
823     }
824 
825     /** Deliver result to result collector. */
826     @Override
827     protected void callComplete() {
828       results.callComplete(this);
829     }
830   }
831 
832   /** Result collector for parallel calls. */
833   protected static class ParallelResults {
834     protected final Writable[] values;
835     protected int size;
836     protected int count;
837 
838     public ParallelResults(int size) {
839       this.values = new Writable[size];
840       this.size = size;
841     }
842 
843     /*
844      * Collect a result.
845      */
846     synchronized void callComplete(ParallelCall call) {
847       // FindBugs IS2_INCONSISTENT_SYNC
848       values[call.index] = call.value;            // store the value
849       count++;                                    // count it
850       if (count == size)                          // if all values are in
851         notify();                                 // then notify waiting caller
852     }
853   }
854 
855   /**
856    * Construct an IPC client whose values are of the given {@link Writable}
857    * class.
858    * @param valueClass value class
859    * @param conf configuration
860    * @param factory socket factory
861    */
862   public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
863       SocketFactory factory) {
864     this.valueClass = valueClass;
865     this.maxIdleTime =
866       conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
867     this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
868     this.failureSleep = conf.getInt("hbase.client.pause", 1000);
869     this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", false);
870     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
871     this.pingInterval = getPingInterval(conf);
872     if (LOG.isDebugEnabled()) {
873       LOG.debug("The ping interval is" + this.pingInterval + "ms.");
874     }
875     this.conf = conf;
876     this.socketFactory = factory;
877     this.clusterId = conf.get(HConstants.CLUSTER_ID, "default");
878     this.connections = new PoolMap<ConnectionId, Connection>(
879         getPoolType(conf), getPoolSize(conf));
880     String hostName = this.conf.get("hbase.regionserver.rpc.client.socket.bind.address");
881     if (hostName != null) {
882       this.bindAddress = new InetSocketAddress(hostName, 0);
883     } else {
884       this.bindAddress = null;
885     }
886     this.failedServers = new FailedServers(conf);
887   }
888 
889   /**
890    * Construct an IPC client with the default SocketFactory
891    * @param valueClass value class
892    * @param conf configuration
893    */
894   public HBaseClient(Class<? extends Writable> valueClass, Configuration conf) {
895     this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
896   }
897 
898   /**
899    * Return the pool type specified in the configuration, which must be set to
900    * either {@link PoolType#RoundRobin} or {@link PoolType#ThreadLocal},
901    * otherwise default to the former.
902    *
903    * For applications with many user threads, use a small round-robin pool. For
904    * applications with few user threads, you may want to try using a
905    * thread-local pool. In any case, the number of {@link HBaseClient} instances
906    * should not exceed the operating system's hard limit on the number of
907    * connections.
908    *
909    * @param config configuration
910    * @return either a {@link PoolType#RoundRobin} or
911    *         {@link PoolType#ThreadLocal}
912    */
913   protected static PoolType getPoolType(Configuration config) {
914     return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
915         PoolType.RoundRobin, PoolType.ThreadLocal);
916   }
917 
918   /**
919    * Return the pool size specified in the configuration, which is applicable only if
920    * the pool type is {@link PoolType#RoundRobin}.
921    *
922    * @param config
923    * @return the maximum pool size
924    */
925   protected static int getPoolSize(Configuration config) {
926     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
927   }
928 
929   /** Return the socket factory of this client
930    *
931    * @return this client's socket factory
932    */
933   SocketFactory getSocketFactory() {
934     return socketFactory;
935   }
936 
937   /** Stop all threads related to this client.  No further calls may be made
938    * using this client. */
939   public void stop() {
940     if (LOG.isDebugEnabled()) {
941       LOG.debug("Stopping client");
942     }
943 
944     if (!running.compareAndSet(true, false)) {
945       return;
946     }
947 
948     // wake up all connections
949     synchronized (connections) {
950       for (Connection conn : connections.values()) {
951         conn.interrupt();
952       }
953     }
954 
955     // wait until all connections are closed
956     while (!connections.isEmpty()) {
957       try {
958         Thread.sleep(100);
959       } catch (InterruptedException ignored) {
960       }
961     }
962   }
963 
964   /** Make a call, passing <code>param</code>, to the IPC server running at
965    * <code>address</code>, returning the value.  Throws exceptions if there are
966    * network problems or if the remote code threw an exception.
967    * @param param writable parameter
968    * @param address network address
969    * @return Writable
970    * @throws IOException e
971    */
972   public Writable call(Writable param, InetSocketAddress address)
973   throws IOException, InterruptedException {
974       return call(param, address, null, 0);
975   }
976 
977   public Writable call(Writable param, InetSocketAddress addr,
978                        User ticket, int rpcTimeout)
979                        throws IOException, InterruptedException {
980     return call(param, addr, null, ticket, rpcTimeout);
981   }
982 
983   /** Make a call, passing <code>param</code>, to the IPC server running at
984    * <code>address</code> which is servicing the <code>protocol</code> protocol,
985    * with the <code>ticket</code> credentials, returning the value.
986    * Throws exceptions if there are network problems or if the remote code
987    * threw an exception. */
988   public Writable call(Writable param, InetSocketAddress addr,
989                        Class<? extends VersionedProtocol> protocol,
990                        User ticket, int rpcTimeout)
991       throws InterruptedException, IOException {
992     Call call = new Call(param);
993     Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
994     connection.sendParam(call);                 // send the parameter
995     boolean interrupted = false;
996     //noinspection SynchronizationOnLocalVariableOrMethodParameter
997     synchronized (call) {
998       while (!call.done) {
999         if (connection.shouldCloseConnection.get()) {
1000           throw new IOException("Unexpected closed connection");
1001         }
1002         try {
1003           call.wait(1000);                       // wait for the result
1004         } catch (InterruptedException ignored) {
1005           // save the fact that we were interrupted
1006           interrupted = true;
1007         }
1008       }
1009 
1010       if (interrupted) {
1011         // set the interrupt flag now that we are done waiting
1012         Thread.currentThread().interrupt();
1013       }
1014 
1015       if (call.error != null) {
1016         if (call.error instanceof RemoteException) {
1017           call.error.fillInStackTrace();
1018           throw call.error;
1019         }
1020         // local exception
1021         throw wrapException(addr, call.error);
1022       }
1023       return call.value;
1024     }
1025   }
1026 
1027   /**
1028    * Take an IOException and the address we were trying to connect to
1029    * and return an IOException with the input exception as the cause.
1030    * The new exception provides the stack trace of the place where
1031    * the exception is thrown and some extra diagnostics information.
1032    * If the exception is ConnectException or SocketTimeoutException,
1033    * return a new one of the same type; Otherwise return an IOException.
1034    *
1035    * @param addr target address
1036    * @param exception the relevant exception
1037    * @return an exception to throw
1038    */
1039   @SuppressWarnings({"ThrowableInstanceNeverThrown"})
1040   protected IOException wrapException(InetSocketAddress addr,
1041                                          IOException exception) {
1042     if (exception instanceof ConnectException) {
1043       //connection refused; include the host:port in the error
1044       return (ConnectException)new ConnectException(
1045            "Call to " + addr + " failed on connection exception: " + exception)
1046                     .initCause(exception);
1047     } else if (exception instanceof SocketTimeoutException) {
1048       return (SocketTimeoutException)new SocketTimeoutException(
1049            "Call to " + addr + " failed on socket timeout exception: "
1050                       + exception).initCause(exception);
1051     } else {
1052       return (IOException)new IOException(
1053            "Call to " + addr + " failed on local exception: " + exception)
1054                                  .initCause(exception);
1055 
1056     }
1057   }
1058 
1059   /** Makes a set of calls in parallel.  Each parameter is sent to the
1060    * corresponding address.  When all values are available, or have timed out
1061    * or errored, the collected results are returned in an array.  The array
1062    * contains nulls for calls that timed out or errored.
1063    * @param params writable parameters
1064    * @param addresses socket addresses
1065    * @return  Writable[]
1066    * @throws IOException e
1067    * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, User)} instead
1068    */
1069   @Deprecated
1070   public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
1071     throws IOException, InterruptedException {
1072     return call(params, addresses, null, null);
1073   }
1074 
1075   /** Makes a set of calls in parallel.  Each parameter is sent to the
1076    * corresponding address.  When all values are available, or have timed out
1077    * or errored, the collected results are returned in an array.  The array
1078    * contains nulls for calls that timed out or errored.  */
1079   public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
1080                          Class<? extends VersionedProtocol> protocol,
1081                          User ticket)
1082       throws IOException, InterruptedException {
1083     if (addresses.length == 0) return new Writable[0];
1084 
1085     ParallelResults results = new ParallelResults(params.length);
1086     // TODO this synchronization block doesnt make any sense, we should possibly fix it
1087     //noinspection SynchronizationOnLocalVariableOrMethodParameter
1088     synchronized (results) {
1089       for (int i = 0; i < params.length; i++) {
1090         ParallelCall call = new ParallelCall(params[i], results, i);
1091         try {
1092           Connection connection =
1093               getConnection(addresses[i], protocol, ticket, 0, call);
1094           connection.sendParam(call);             // send each parameter
1095         } catch (IOException e) {
1096           // log errors
1097           LOG.info("Calling "+addresses[i]+" caught: " +
1098                    e.getMessage(),e);
1099           results.size--;                         //  wait for one fewer result
1100         }
1101       }
1102       while (results.count != results.size) {
1103         try {
1104           results.wait();                    // wait for all results
1105         } catch (InterruptedException ignored) {}
1106       }
1107 
1108       return results.values;
1109     }
1110   }
1111 
1112   /* Get a connection from the pool, or create a new one and add it to the
1113    * pool.  Connections to a given host/port are reused. */
1114   protected Connection getConnection(InetSocketAddress addr,
1115                                    Class<? extends VersionedProtocol> protocol,
1116                                    User ticket,
1117                                    int rpcTimeout,
1118                                    Call call)
1119                                    throws IOException, InterruptedException {
1120     if (!running.get()) {
1121       // the client is stopped
1122       throw new IOException("The client is stopped");
1123     }
1124     Connection connection;
1125     /* we could avoid this allocation for each RPC by having a
1126      * connectionsId object and with set() method. We need to manage the
1127      * refs for keys in HashMap properly. For now its ok.
1128      */
1129     ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
1130     synchronized (connections) {
1131       connection = connections.get(remoteId);
1132       if (connection == null) {
1133         connection = createConnection(remoteId);
1134         connections.put(remoteId, connection);
1135       }
1136     }
1137     connection.addCall(call);
1138 
1139     //we don't invoke the method below inside "synchronized (connections)"
1140     //block above. The reason for that is if the server happens to be slow,
1141     //it will take longer to establish a connection and that will slow the
1142     //entire system down.
1143     //Moreover, if the connection is currently created, there will be many threads
1144     // waiting here; as setupIOstreams is synchronized. If the connection fails with a
1145     // timeout, they will all fail simultaneously. This is checked in setupIOstreams.
1146     connection.setupIOstreams();
1147     return connection;
1148   }
1149 
1150   /**
1151    * This class holds the address and the user ticket. The client connections
1152    * to servers are uniquely identified by <remoteAddress, ticket>
1153    */
1154   protected static class ConnectionId {
1155     final InetSocketAddress address;
1156     final User ticket;
1157     final int rpcTimeout;
1158     Class<? extends VersionedProtocol> protocol;
1159     private static final int PRIME = 16777619;
1160 
1161     ConnectionId(InetSocketAddress address,
1162         Class<? extends VersionedProtocol> protocol,
1163         User ticket,
1164         int rpcTimeout) {
1165       this.protocol = protocol;
1166       this.address = address;
1167       this.ticket = ticket;
1168       this.rpcTimeout = rpcTimeout;
1169     }
1170 
1171     InetSocketAddress getAddress() {
1172       return address;
1173     }
1174 
1175     Class<? extends VersionedProtocol> getProtocol() {
1176       return protocol;
1177     }
1178 
1179     User getTicket() {
1180       return ticket;
1181     }
1182 
1183     @Override
1184     public boolean equals(Object obj) {
1185      if (obj instanceof ConnectionId) {
1186        ConnectionId id = (ConnectionId) obj;
1187        return address.equals(id.address) && protocol == id.protocol &&
1188               ((ticket != null && ticket.equals(id.ticket)) ||
1189                (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout;
1190      }
1191      return false;
1192     }
1193 
1194     @Override  // simply use the default Object#hashcode() ?
1195     public int hashCode() {
1196       return (address.hashCode() + PRIME * (
1197                   PRIME * System.identityHashCode(protocol) ^
1198              (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
1199     }
1200   }
1201 
1202   /**
1203    * @return the clusterId
1204    */
1205   public String getClusterId() {
1206     return clusterId;
1207   }
1208 }