View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.protobuf.ByteString;
23  import com.google.protobuf.ServiceException;
24
25  import java.io.Closeable;
26  import java.io.IOException;
27  import java.io.InterruptedIOException;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.HashMap;
31  import java.util.LinkedList;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicReference;
41  import java.util.regex.Pattern;
42
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.hbase.Abortable;
47  import org.apache.hadoop.hbase.ClusterStatus;
48  import org.apache.hadoop.hbase.DoNotRetryIOException;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HColumnDescriptor;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.HRegionLocation;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.MasterNotRunningException;
56  import org.apache.hadoop.hbase.MetaTableAccessor;
57  import org.apache.hadoop.hbase.NamespaceDescriptor;
58  import org.apache.hadoop.hbase.NotServingRegionException;
59  import org.apache.hadoop.hbase.ProcedureInfo;
60  import org.apache.hadoop.hbase.ProcedureUtil;
61  import org.apache.hadoop.hbase.RegionLocations;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableExistsException;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.TableNotDisabledException;
66  import org.apache.hadoop.hbase.TableNotFoundException;
67  import org.apache.hadoop.hbase.UnknownRegionException;
68  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
69  import org.apache.hadoop.hbase.classification.InterfaceAudience;
70  import org.apache.hadoop.hbase.classification.InterfaceStability;
71  import org.apache.hadoop.hbase.client.security.SecurityCapability;
72  import org.apache.hadoop.hbase.exceptions.DeserializationException;
73  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
74  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
75  import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
76  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
77  import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel;
78  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
79  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
80  import org.apache.hadoop.hbase.protobuf.RequestConverter;
81  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
82  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
83  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
84  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
85  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
86  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
87  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
89  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
90  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
92  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
93  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
94  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
95  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
96  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
97  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
98  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
99  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListProceduresRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
161 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
162 import org.apache.hadoop.hbase.quotas.QuotaFilter;
163 import org.apache.hadoop.hbase.quotas.QuotaRetriever;
164 import org.apache.hadoop.hbase.quotas.QuotaSettings;
165 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
166 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
167 import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
168 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
169 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
170 import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
171 import org.apache.hadoop.hbase.util.Addressing;
172 import org.apache.hadoop.hbase.util.Bytes;
173 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
174 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
175 import org.apache.hadoop.hbase.util.Pair;
176 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
177 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
178 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
179 import org.apache.hadoop.ipc.RemoteException;
180 import org.apache.hadoop.util.StringUtils;
181 import org.apache.zookeeper.KeeperException;
182
183 /**
184  * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
185  * this is an HBase-internal class as defined in
186  * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html
187  * There are no guarantees for backwards source / binary compatibility and methods or class can
188  * change or go away without deprecation.
189  * Use {@link Connection#getAdmin()} to obtain an instance of {@link Admin} instead of constructing
190  * an HBaseAdmin directly.
191  *
192  * <p>Connection should be an <i>unmanaged</i> connection obtained via
193  * {@link ConnectionFactory#createConnection(Configuration)}
194  *
195  * @see ConnectionFactory
196  * @see Connection
197  * @see Admin
198  */
199 @InterfaceAudience.Private
200 @InterfaceStability.Evolving
201 public class HBaseAdmin implements Admin {
202   private static final Log LOG = LogFactory.getLog(HBaseAdmin.class);
203
204   private static final String ZK_IDENTIFIER_PREFIX =  "hbase-admin-on-";
205
206   private ClusterConnection connection;
207
208   private volatile Configuration conf;
209   private final long pause;
210   private final int numRetries;
211   // Some operations can take a long time such as disable of big table.
212   // numRetries is for 'normal' stuff... Multiply by this factor when
213   // want to wait a long time.
214   private final int retryLongerMultiplier;
215   private final int syncWaitTimeout;
216   private boolean aborted;
217   private int operationTimeout;
218   private int rpcTimeout;
219
220   private RpcRetryingCallerFactory rpcCallerFactory;
221   private RpcControllerFactory rpcControllerFactory;
222
223   private NonceGenerator ng;
224
225   @Override
226   public int getOperationTimeout() {
227     return operationTimeout;
228   }
229
230   HBaseAdmin(ClusterConnection connection) throws IOException {
231     this.conf = connection.getConfiguration();
232     this.connection = connection;
233
234     // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time.
235     this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
236         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
237     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
238         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
239     this.retryLongerMultiplier = this.conf.getInt(
240         "hbase.client.retries.longer.multiplier", 10);
241     this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
242         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
243     this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
244         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
245     this.syncWaitTimeout = this.conf.getInt(
246       "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
247
248     this.rpcCallerFactory = connection.getRpcRetryingCallerFactory();
249     this.rpcControllerFactory = connection.getRpcControllerFactory();
250
251     this.ng = this.connection.getNonceGenerator();
252   }
253
254   @Override
255   public void abort(String why, Throwable e) {
256     // Currently does nothing but throw the passed message and exception
257     this.aborted = true;
258     throw new RuntimeException(why, e);
259   }
260
261   @Override
262   public boolean isAborted(){
263     return this.aborted;
264   }
265
266   @Override
267   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
268   throws IOException {
269     return get(abortProcedureAsync(procId, mayInterruptIfRunning), this.syncWaitTimeout,
270       TimeUnit.MILLISECONDS);
271   }
272
273   @Override
274   public Future<Boolean> abortProcedureAsync(
275       final long procId,
276       final boolean mayInterruptIfRunning) throws IOException {
277     Boolean abortProcResponse = executeCallable(
278       new MasterCallable<AbortProcedureResponse>(getConnection()) {
279         @Override
280         public AbortProcedureResponse call(int callTimeout) throws ServiceException {
281           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
282           controller.setCallTimeout(callTimeout);
283           AbortProcedureRequest abortProcRequest =
284               AbortProcedureRequest.newBuilder().setProcId(procId).build();
285           return master.abortProcedure(controller, abortProcRequest);
286         }
287       }).getIsProcedureAborted();
288
289     AbortProcedureFuture abortProcFuture =
290         new AbortProcedureFuture(this, procId, abortProcResponse);
291     return abortProcFuture;
292   }
293
294   private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
295     private boolean isAbortInProgress;
296
297     public AbortProcedureFuture(
298         final HBaseAdmin admin,
299         final Long procId,
300         final Boolean abortProcResponse) {
301       super(admin, procId);
302       this.isAbortInProgress = abortProcResponse;
303     }
304
305     @Override
306     public Boolean get(long timeout, TimeUnit unit)
307         throws InterruptedException, ExecutionException, TimeoutException {
308       if (!this.isAbortInProgress) {
309         return false;
310       }
311       super.get(timeout, unit);
312       return true;
313     }
314   }
315
316   /** @return Connection used by this object. */
317   @Override
318   public Connection getConnection() {
319     return connection;
320   }
321
322   @Override
323   public boolean tableExists(final TableName tableName) throws IOException {
324     return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
325       @Override
326       public Boolean call(int callTimeout) throws ServiceException, IOException {
327         return MetaTableAccessor.tableExists(connection, tableName);
328       }
329     });
330   }
331
332   @Override
333   public HTableDescriptor[] listTables() throws IOException {
334     return listTables((Pattern)null, false);
335   }
336
337   @Override
338   public HTableDescriptor[] listTables(Pattern pattern) throws IOException {
339     return listTables(pattern, false);
340   }
341
342   @Override
343   public HTableDescriptor[] listTables(String regex) throws IOException {
344     return listTables(Pattern.compile(regex), false);
345   }
346
347   @Override
348   public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables)
349       throws IOException {
350     return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
351       @Override
352       public HTableDescriptor[] call(int callTimeout) throws ServiceException {
353         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
354         controller.setCallTimeout(callTimeout);
355         GetTableDescriptorsRequest req =
356             RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
357         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
358       }
359     });
360   }
361
362   @Override
363   public HTableDescriptor[] listTables(String regex, boolean includeSysTables)
364       throws IOException {
365     return listTables(Pattern.compile(regex), includeSysTables);
366   }
367
368   @Override
369   public TableName[] listTableNames() throws IOException {
370     return listTableNames((Pattern)null, false);
371   }
372
373   @Override
374   public TableName[] listTableNames(Pattern pattern) throws IOException {
375     return listTableNames(pattern, false);
376   }
377
378   @Override
379   public TableName[] listTableNames(String regex) throws IOException {
380     return listTableNames(Pattern.compile(regex), false);
381   }
382
383   @Override
384   public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
385       throws IOException {
386     return executeCallable(new MasterCallable<TableName[]>(getConnection()) {
387       @Override
388       public TableName[] call(int callTimeout) throws ServiceException {
389         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
390         controller.setCallTimeout(callTimeout);
391         GetTableNamesRequest req =
392             RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
393         return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req)
394             .getTableNamesList());
395       }
396     });
397   }
398
399   @Override
400   public TableName[] listTableNames(final String regex, final boolean includeSysTables)
401       throws IOException {
402     return listTableNames(Pattern.compile(regex), includeSysTables);
403   }
404
405   @Override
406   public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
407     return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
408        operationTimeout, rpcTimeout);
409   }
410
411   static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
412       RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
413       int operationTimeout, int rpcTimeout) throws IOException {
414       if (tableName == null) return null;
415       HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
416         @Override
417         public HTableDescriptor call(int callTimeout) throws ServiceException {
418           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
419           controller.setCallTimeout(callTimeout);
420           GetTableDescriptorsResponse htds;
421           GetTableDescriptorsRequest req =
422                   RequestConverter.buildGetTableDescriptorsRequest(tableName);
423           htds = master.getTableDescriptors(controller, req);
424
425           if (!htds.getTableSchemaList().isEmpty()) {
426             return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
427           }
428           return null;
429         }
430       }, rpcCallerFactory, operationTimeout, rpcTimeout);
431       if (htd != null) {
432         return htd;
433       }
434       throw new TableNotFoundException(tableName.getNameAsString());
435   }
436
437   private long getPauseTime(int tries) {
438     int triesCount = tries;
439     if (triesCount >= HConstants.RETRY_BACKOFF.length) {
440       triesCount = HConstants.RETRY_BACKOFF.length - 1;
441     }
442     return this.pause * HConstants.RETRY_BACKOFF[triesCount];
443   }
444
445   @Override
446   public void createTable(HTableDescriptor desc)
447   throws IOException {
448     createTable(desc, null);
449   }
450
451   @Override
452   public void createTable(HTableDescriptor desc, byte [] startKey,
453       byte [] endKey, int numRegions)
454   throws IOException {
455     if(numRegions < 3) {
456       throw new IllegalArgumentException("Must create at least three regions");
457     } else if(Bytes.compareTo(startKey, endKey) >= 0) {
458       throw new IllegalArgumentException("Start key must be smaller than end key");
459     }
460     if (numRegions == 3) {
461       createTable(desc, new byte[][]{startKey, endKey});
462       return;
463     }
464     byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
465     if(splitKeys == null || splitKeys.length != numRegions - 1) {
466       throw new IllegalArgumentException("Unable to split key range into enough regions");
467     }
468     createTable(desc, splitKeys);
469   }
470
471   @Override
472   public void createTable(final HTableDescriptor desc, byte [][] splitKeys)
473       throws IOException {
474     get(createTableAsync(desc, splitKeys), syncWaitTimeout, TimeUnit.MILLISECONDS);
475   }
476
477   @Override
478   public Future<Void> createTableAsync(final HTableDescriptor desc, final byte[][] splitKeys)
479       throws IOException {
480     if (desc.getTableName() == null) {
481       throw new IllegalArgumentException("TableName cannot be null");
482     }
483     if (splitKeys != null && splitKeys.length > 0) {
484       Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
485       // Verify there are no duplicate split keys
486       byte[] lastKey = null;
487       for (byte[] splitKey : splitKeys) {
488         if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
489           throw new IllegalArgumentException(
490               "Empty split key must not be passed in the split keys.");
491         }
492         if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
493           throw new IllegalArgumentException("All split keys must be unique, " +
494             "found duplicate: " + Bytes.toStringBinary(splitKey) +
495             ", " + Bytes.toStringBinary(lastKey));
496         }
497         lastKey = splitKey;
498       }
499     }
500
501     CreateTableResponse response = executeCallable(
502       new MasterCallable<CreateTableResponse>(getConnection()) {
503         @Override
504         public CreateTableResponse call(int callTimeout) throws ServiceException {
505           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
506           controller.setCallTimeout(callTimeout);
507           controller.setPriority(desc.getTableName());
508           CreateTableRequest request = RequestConverter.buildCreateTableRequest(
509             desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
510           return master.createTable(controller, request);
511         }
512       });
513     return new CreateTableFuture(this, desc, splitKeys, response);
514   }
515
516   private static class CreateTableFuture extends TableFuture<Void> {
517     private final HTableDescriptor desc;
518     private final byte[][] splitKeys;
519
520     public CreateTableFuture(final HBaseAdmin admin, final HTableDescriptor desc,
521         final byte[][] splitKeys, final CreateTableResponse response) {
522       super(admin, desc.getTableName(),
523               (response != null && response.hasProcId()) ? response.getProcId() : null);
524       this.splitKeys = splitKeys;
525       this.desc = desc;
526     }
527
528     @Override
529     protected HTableDescriptor getTableDescriptor() {
530       return desc;
531     }
532
533     @Override
534     public String getOperationType() {
535       return "CREATE";
536     }
537
538     @Override
539     protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
540       waitForTableEnabled(deadlineTs);
541       waitForAllRegionsOnline(deadlineTs, splitKeys);
542       return null;
543     }
544   }
545
546   @Override
547   public void deleteTable(final TableName tableName) throws IOException {
548     get(deleteTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
549   }
550
551   @Override
552   public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
553     DeleteTableResponse response = executeCallable(
554       new MasterCallable<DeleteTableResponse>(getConnection()) {
555         @Override
556         public DeleteTableResponse call(int callTimeout) throws ServiceException {
557           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
558           controller.setCallTimeout(callTimeout);
559           controller.setPriority(tableName);
560           DeleteTableRequest req =
561               RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
562           return master.deleteTable(controller,req);
563         }
564       });
565     return new DeleteTableFuture(this, tableName, response);
566   }
567
568   private static class DeleteTableFuture extends TableFuture<Void> {
569     public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
570         final DeleteTableResponse response) {
571       super(admin, tableName,
572               (response != null && response.hasProcId()) ? response.getProcId() : null);
573     }
574
575     @Override
576     public String getOperationType() {
577       return "DELETE";
578     }
579
580     @Override
581     protected Void waitOperationResult(final long deadlineTs)
582         throws IOException, TimeoutException {
583       waitTableNotFound(deadlineTs);
584       return null;
585     }
586
587     @Override
588     protected Void postOperationResult(final Void result, final long deadlineTs)
589         throws IOException, TimeoutException {
590       // Delete cached information to prevent clients from using old locations
591       ((ClusterConnection) getAdmin().getConnection()).clearRegionCache(getTableName());
592       return super.postOperationResult(result, deadlineTs);
593     }
594   }
595
596   @Override
597   public HTableDescriptor[] deleteTables(String regex) throws IOException {
598     return deleteTables(Pattern.compile(regex));
599   }
600
601   /**
602    * Delete tables matching the passed in pattern and wait on completion.
603    *
604    * Warning: Use this method carefully, there is no prompting and the effect is
605    * immediate. Consider using {@link #listTables(java.util.regex.Pattern) } and
606    * {@link #deleteTable(TableName)}
607    *
608    * @param pattern The pattern to match table names against
609    * @return Table descriptors for tables that couldn't be deleted
610    * @throws IOException
611    */
612   @Override
613   public HTableDescriptor[] deleteTables(Pattern pattern) throws IOException {
614     List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
615     for (HTableDescriptor table : listTables(pattern)) {
616       try {
617         deleteTable(table.getTableName());
618       } catch (IOException ex) {
619         LOG.info("Failed to delete table " + table.getTableName(), ex);
620         failed.add(table);
621       }
622     }
623     return failed.toArray(new HTableDescriptor[failed.size()]);
624   }
625
626   @Override
627   public void truncateTable(final TableName tableName, final boolean preserveSplits)
628       throws IOException {
629     get(truncateTableAsync(tableName, preserveSplits), syncWaitTimeout, TimeUnit.MILLISECONDS);
630   }
631
632   @Override
633   public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
634       throws IOException {
635     TruncateTableResponse response =
636         executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) {
637           @Override
638           public TruncateTableResponse call(int callTimeout) throws ServiceException {
639             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
640             controller.setCallTimeout(callTimeout);
641             controller.setPriority(tableName);
642             LOG.info("Started truncating " + tableName);
643             TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
644               tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
645             return master.truncateTable(controller, req);
646           }
647         });
648     return new TruncateTableFuture(this, tableName, preserveSplits, response);
649   }
650
651   private static class TruncateTableFuture extends TableFuture<Void> {
652     private final boolean preserveSplits;
653
654     public TruncateTableFuture(final HBaseAdmin admin, final TableName tableName,
655         final boolean preserveSplits, final TruncateTableResponse response) {
656       super(admin, tableName,
657              (response != null && response.hasProcId()) ? response.getProcId() : null);
658       this.preserveSplits = preserveSplits;
659     }
660
661     @Override
662     public String getOperationType() {
663       return "TRUNCATE";
664     }
665
666     @Override
667     protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
668       waitForTableEnabled(deadlineTs);
669       // once the table is enabled, we know the operation is done. so we can fetch the splitKeys
670       byte[][] splitKeys = preserveSplits ? getAdmin().getTableSplits(getTableName()) : null;
671       waitForAllRegionsOnline(deadlineTs, splitKeys);
672       return null;
673     }
674   }
675
676   private byte[][] getTableSplits(final TableName tableName) throws IOException {
677     byte[][] splits = null;
678     try (RegionLocator locator = getConnection().getRegionLocator(tableName)) {
679       byte[][] startKeys = locator.getStartKeys();
680       if (startKeys.length == 1) {
681         return splits;
682       }
683       splits = new byte[startKeys.length - 1][];
684       for (int i = 1; i < startKeys.length; i++) {
685         splits[i - 1] = startKeys[i];
686       }
687     }
688     return splits;
689   }
690
691   @Override
692   public void enableTable(final TableName tableName)
693   throws IOException {
694     get(enableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
695   }
696
697   @Override
698   public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
699     TableName.isLegalFullyQualifiedTableName(tableName.getName());
700     EnableTableResponse response = executeCallable(
701       new MasterCallable<EnableTableResponse>(getConnection()) {
702         @Override
703         public EnableTableResponse call(int callTimeout) throws ServiceException {
704           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
705           controller.setCallTimeout(callTimeout);
706           controller.setPriority(tableName);
707
708           LOG.info("Started enable of " + tableName);
709           EnableTableRequest req =
710               RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
711           return master.enableTable(controller,req);
712         }
713       });
714     return new EnableTableFuture(this, tableName, response);
715   }
716
717   private static class EnableTableFuture extends TableFuture<Void> {
718     public EnableTableFuture(final HBaseAdmin admin, final TableName tableName,
719         final EnableTableResponse response) {
720       super(admin, tableName,
721               (response != null && response.hasProcId()) ? response.getProcId() : null);
722     }
723
724     @Override
725     public String getOperationType() {
726       return "ENABLE";
727     }
728
729     @Override
730     protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
731       waitForTableEnabled(deadlineTs);
732       return null;
733     }
734   }
735
736   @Override
737   public HTableDescriptor[] enableTables(String regex) throws IOException {
738     return enableTables(Pattern.compile(regex));
739   }
740
741   @Override
742   public HTableDescriptor[] enableTables(Pattern pattern) throws IOException {
743     List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
744     for (HTableDescriptor table : listTables(pattern)) {
745       if (isTableDisabled(table.getTableName())) {
746         try {
747           enableTable(table.getTableName());
748         } catch (IOException ex) {
749           LOG.info("Failed to enable table " + table.getTableName(), ex);
750           failed.add(table);
751         }
752       }
753     }
754     return failed.toArray(new HTableDescriptor[failed.size()]);
755   }
756
757   @Override
758   public void disableTable(final TableName tableName)
759   throws IOException {
760     get(disableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
761   }
762
763   @Override
764   public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
765     TableName.isLegalFullyQualifiedTableName(tableName.getName());
766     DisableTableResponse response = executeCallable(
767       new MasterCallable<DisableTableResponse>(getConnection()) {
768         @Override
769         public DisableTableResponse call(int callTimeout) throws ServiceException {
770           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
771           controller.setCallTimeout(callTimeout);
772           controller.setPriority(tableName);
773
774           LOG.info("Started disable of " + tableName);
775           DisableTableRequest req =
776               RequestConverter.buildDisableTableRequest(
777                 tableName, ng.getNonceGroup(), ng.newNonce());
778           return master.disableTable(controller, req);
779         }
780       });
781     return new DisableTableFuture(this, tableName, response);
782   }
783
784   private static class DisableTableFuture extends TableFuture<Void> {
785     public DisableTableFuture(final HBaseAdmin admin, final TableName tableName,
786         final DisableTableResponse response) {
787       super(admin, tableName,
788               (response != null && response.hasProcId()) ? response.getProcId() : null);
789     }
790
791     @Override
792     public String getOperationType() {
793       return "DISABLE";
794     }
795
796     @Override
797     protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException {
798       waitForTableDisabled(deadlineTs);
799       return null;
800     }
801   }
802
803   @Override
804   public HTableDescriptor[] disableTables(String regex) throws IOException {
805     return disableTables(Pattern.compile(regex));
806   }
807
808   @Override
809   public HTableDescriptor[] disableTables(Pattern pattern) throws IOException {
810     List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
811     for (HTableDescriptor table : listTables(pattern)) {
812       if (isTableEnabled(table.getTableName())) {
813         try {
814           disableTable(table.getTableName());
815         } catch (IOException ex) {
816           LOG.info("Failed to disable table " + table.getTableName(), ex);
817           failed.add(table);
818         }
819       }
820     }
821     return failed.toArray(new HTableDescriptor[failed.size()]);
822   }
823
824   @Override
825   public boolean isTableEnabled(final TableName tableName) throws IOException {
826     checkTableExists(tableName);
827     return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
828       @Override
829       public Boolean call(int callTimeout) throws ServiceException, IOException {
830         TableState tableState = MetaTableAccessor.getTableState(connection, tableName);
831         if (tableState == null)
832           throw new TableNotFoundException(tableName);
833         return tableState.inStates(TableState.State.ENABLED);
834       }
835     });
836   }
837
838   @Override
839   public boolean isTableDisabled(TableName tableName) throws IOException {
840     checkTableExists(tableName);
841     return connection.isTableDisabled(tableName);
842   }
843
844   @Override
845   public boolean isTableAvailable(TableName tableName) throws IOException {
846     return connection.isTableAvailable(tableName, null);
847   }
848
849   @Override
850   public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
851     return connection.isTableAvailable(tableName, splitKeys);
852   }
853
854   @Override
855   public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException {
856     return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
857       @Override
858       public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
859         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
860         controller.setCallTimeout(callTimeout);
861         controller.setPriority(tableName);
862
863         GetSchemaAlterStatusRequest req = RequestConverter
864             .buildGetSchemaAlterStatusRequest(tableName);
865         GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req);
866         Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
867             ret.getTotalRegions());
868         return pair;
869       }
870     });
871   }
872
873   @Override
874   public Pair<Integer, Integer> getAlterStatus(final byte[] tableName) throws IOException {
875     return getAlterStatus(TableName.valueOf(tableName));
876   }
877
878   /**
879    * {@inheritDoc}
880    * @deprecated Since 2.0. Will be removed in 3.0. Use
881    *     {@link #addColumnFamily(TableName, HColumnDescriptor)} instead.
882    */
883   @Override
884   @Deprecated
885   public void addColumn(final TableName tableName, final HColumnDescriptor columnFamily)
886   throws IOException {
887     addColumnFamily(tableName, columnFamily);
888   }
889
890   @Override
891   public Future<Void> addColumnFamily(final TableName tableName,
892       final HColumnDescriptor columnFamily) throws IOException {
893     AddColumnResponse response =
894         executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) {
895           @Override
896           public AddColumnResponse call(int callTimeout) throws ServiceException {
897             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
898             controller.setCallTimeout(callTimeout);
899             controller.setPriority(tableName);
900
901             AddColumnRequest req =
902                 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
903                   ng.newNonce());
904             return master.addColumn(controller, req);
905           }
906         });
907     return new AddColumnFamilyFuture(this, tableName, response);
908   }
909
910   private static class AddColumnFamilyFuture extends ModifyTableFuture {
911     public AddColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
912         final AddColumnResponse response) {
913       super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
914           : null);
915     }
916
917     @Override
918     public String getOperationType() {
919       return "ADD_COLUMN_FAMILY";
920     }
921   }
922
923   /**
924    * {@inheritDoc}
925    * @deprecated Since 2.0. Will be removed in 3.0. Use
926    *     {@link #deleteColumnFamily(TableName, byte[])} instead.
927    */
928   @Override
929   @Deprecated
930   public void deleteColumn(final TableName tableName, final byte[] columnFamily)
931   throws IOException {
932     deleteColumnFamily(tableName, columnFamily);
933   }
934
935   @Override
936   public Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
937       throws IOException {
938     DeleteColumnResponse response =
939         executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection()) {
940           @Override
941           public DeleteColumnResponse call(int callTimeout) throws ServiceException {
942             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
943             controller.setCallTimeout(callTimeout);
944             controller.setPriority(tableName);
945
946             DeleteColumnRequest req =
947                 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
948                   ng.getNonceGroup(), ng.newNonce());
949             master.deleteColumn(controller, req);
950             return null;
951           }
952         });
953     return new DeleteColumnFamilyFuture(this, tableName, response);
954   }
955
956   private static class DeleteColumnFamilyFuture extends ModifyTableFuture {
957     public DeleteColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
958         final DeleteColumnResponse response) {
959       super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
960           : null);
961     }
962
963     @Override
964     public String getOperationType() {
965       return "DELETE_COLUMN_FAMILY";
966     }
967   }
968
969   /**
970    * {@inheritDoc}
971    * @deprecated As of 2.0. Will be removed in 3.0. Use
972    *     {@link #modifyColumnFamily(TableName, HColumnDescriptor)} instead.
973    */
974   @Override
975   @Deprecated
976   public void modifyColumn(final TableName tableName, final HColumnDescriptor columnFamily)
977   throws IOException {
978     modifyColumnFamily(tableName, columnFamily);
979   }
980
981   @Override
982   public Future<Void> modifyColumnFamily(final TableName tableName,
983       final HColumnDescriptor columnFamily) throws IOException {
984     ModifyColumnResponse response =
985         executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection()) {
986           @Override
987           public ModifyColumnResponse call(int callTimeout) throws ServiceException {
988             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
989             controller.setCallTimeout(callTimeout);
990             controller.setPriority(tableName);
991
992             ModifyColumnRequest req =
993                 RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
994                   ng.getNonceGroup(), ng.newNonce());
995             master.modifyColumn(controller, req);
996             return null;
997           }
998         });
999     return new ModifyColumnFamilyFuture(this, tableName, response);
1000   }
1001
1002   private static class ModifyColumnFamilyFuture extends ModifyTableFuture {
1003     public ModifyColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
1004         final ModifyColumnResponse response) {
1005       super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
1006           : null);
1007     }
1008
1009     @Override
1010     public String getOperationType() {
1011       return "MODIFY_COLUMN_FAMILY";
1012     }
1013   }
1014
1015   @Override
1016   public void closeRegion(final String regionname, final String serverName) throws IOException {
1017     closeRegion(Bytes.toBytes(regionname), serverName);
1018   }
1019
1020   @Override
1021   public void closeRegion(final byte [] regionname, final String serverName) throws IOException {
1022     if (serverName != null) {
1023       Pair<HRegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionname);
1024       if (pair == null || pair.getFirst() == null) {
1025         throw new UnknownRegionException(Bytes.toStringBinary(regionname));
1026       } else {
1027         closeRegion(ServerName.valueOf(serverName), pair.getFirst());
1028       }
1029     } else {
1030       Pair<HRegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionname);
1031       if (pair == null) {
1032         throw new UnknownRegionException(Bytes.toStringBinary(regionname));
1033       } else if (pair.getSecond() == null) {
1034         throw new NoServerForRegionException(Bytes.toStringBinary(regionname));
1035       } else {
1036         closeRegion(pair.getSecond(), pair.getFirst());
1037       }
1038     }
1039   }
1040
1041   @Override
1042   public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
1043       final String serverName) throws IOException {
1044     if (null == serverName || ("").equals(serverName.trim())) {
1045       throw new IllegalArgumentException(
1046           "The servername cannot be null or empty.");
1047     }
1048     ServerName sn = ServerName.valueOf(serverName);
1049     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1050     // Close the region without updating zk state.
1051     CloseRegionRequest request =
1052       RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
1053     try {
1054       PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1055
1056       // TODO: this does not do retries, it should. Set priority and timeout in controller
1057       CloseRegionResponse response = admin.closeRegion(controller, request);
1058       boolean isRegionClosed = response.getClosed();
1059       if (false == isRegionClosed) {
1060         LOG.error("Not able to close the region " + encodedRegionName + ".");
1061       }
1062       return isRegionClosed;
1063     } catch (ServiceException se) {
1064       throw ProtobufUtil.getRemoteException(se);
1065     }
1066   }
1067
1068   @Override
1069   public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException {
1070     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1071     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1072
1073     // Close the region without updating zk state.
1074     ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName());
1075   }
1076
1077   @Override
1078   public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
1079     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1080     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1081     return ProtobufUtil.getOnlineRegions(controller, admin);
1082   }
1083
1084   @Override
1085   public void flush(final TableName tableName) throws IOException {
1086     checkTableExists(tableName);
1087     if (isTableDisabled(tableName)) {
1088       LOG.info("Table is disabled: " + tableName.getNameAsString());
1089       return;
1090     }
1091     execProcedure("flush-table-proc", tableName.getNameAsString(),
1092       new HashMap<String, String>());
1093   }
1094
1095   @Override
1096   public void flushRegion(final byte[] regionName) throws IOException {
1097     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
1098     if (regionServerPair == null) {
1099       throw new IllegalArgumentException("Unknown regionname: " + Bytes.toStringBinary(regionName));
1100     }
1101     if (regionServerPair.getSecond() == null) {
1102       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1103     }
1104     HRegionInfo hRegionInfo = regionServerPair.getFirst();
1105     ServerName serverName = regionServerPair.getSecond();
1106
1107     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1108
1109     AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
1110     FlushRegionRequest request =
1111         RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
1112     try {
1113       // TODO: this does not do retries, it should. Set priority and timeout in controller
1114       admin.flushRegion(controller, request);
1115     } catch (ServiceException se) {
1116       throw ProtobufUtil.getRemoteException(se);
1117     }
1118   }
1119
1120   /**
1121    * {@inheritDoc}
1122    */
1123   @Override
1124   public void compact(final TableName tableName)
1125     throws IOException {
1126     compact(tableName, null, false, CompactType.NORMAL);
1127   }
1128
1129   @Override
1130   public void compactRegion(final byte[] regionName)
1131     throws IOException {
1132     compactRegion(regionName, null, false);
1133   }
1134
1135   /**
1136    * {@inheritDoc}
1137    */
1138   @Override
1139   public void compact(final TableName tableName, final byte[] columnFamily)
1140     throws IOException {
1141     compact(tableName, columnFamily, false, CompactType.NORMAL);
1142   }
1143
1144   /**
1145    * {@inheritDoc}
1146    */
1147   @Override
1148   public void compactRegion(final byte[] regionName, final byte[] columnFamily)
1149     throws IOException {
1150     compactRegion(regionName, columnFamily, false);
1151   }
1152
1153   /**
1154    * {@inheritDoc}
1155    */
1156   @Override
1157   public void compactRegionServer(final ServerName sn, boolean major)
1158   throws IOException, InterruptedException {
1159     for (HRegionInfo region : getOnlineRegions(sn)) {
1160       compact(sn, region, major, null);
1161     }
1162   }
1163
1164   @Override
1165   public void majorCompact(final TableName tableName)
1166   throws IOException {
1167     compact(tableName, null, true, CompactType.NORMAL);
1168   }
1169
1170   @Override
1171   public void majorCompactRegion(final byte[] regionName)
1172   throws IOException {
1173     compactRegion(regionName, null, true);
1174   }
1175
1176   /**
1177    * {@inheritDoc}
1178    */
1179   @Override
1180   public void majorCompact(final TableName tableName, final byte[] columnFamily)
1181   throws IOException {
1182     compact(tableName, columnFamily, true, CompactType.NORMAL);
1183   }
1184
1185   @Override
1186   public void majorCompactRegion(final byte[] regionName, final byte[] columnFamily)
1187   throws IOException {
1188     compactRegion(regionName, columnFamily, true);
1189   }
1190
1191   /**
1192    * Compact a table.
1193    * Asynchronous operation.
1194    *
1195    * @param tableName table or region to compact
1196    * @param columnFamily column family within a table or region
1197    * @param major True if we are to do a major compaction.
1198    * @throws IOException if a remote or network exception occurs
1199    * @throws InterruptedException
1200    */
1201   private void compact(final TableName tableName, final byte[] columnFamily,final boolean major,
1202                        CompactType compactType) throws IOException {
1203     switch (compactType) {
1204       case MOB:
1205         ServerName master = getMasterAddress();
1206         compact(master, getMobRegionInfo(tableName), major, columnFamily);
1207         break;
1208       case NORMAL:
1209       default:
1210         ZooKeeperWatcher zookeeper = null;
1211         try {
1212           checkTableExists(tableName);
1213           zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
1214                   new ThrowableAbortable());
1215           List<Pair<HRegionInfo, ServerName>> pairs;
1216           if (TableName.META_TABLE_NAME.equals(tableName)) {
1217             pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
1218           } else {
1219             pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
1220           }
1221           for (Pair<HRegionInfo, ServerName> pair: pairs) {
1222             if (pair.getFirst().isOffline()) continue;
1223             if (pair.getSecond() == null) continue;
1224             try {
1225               compact(pair.getSecond(), pair.getFirst(), major, columnFamily);
1226             } catch (NotServingRegionException e) {
1227               if (LOG.isDebugEnabled()) {
1228                 LOG.debug("Trying to" + (major ? " major" : "") + " compact " +
1229                         pair.getFirst() + ": " +
1230                         StringUtils.stringifyException(e));
1231               }
1232             }
1233           }
1234         } finally {
1235           if (zookeeper != null) {
1236             zookeeper.close();
1237           }
1238         }
1239         break;
1240     }
1241   }
1242
1243   /**
1244    * Compact an individual region.
1245    * Asynchronous operation.
1246    *
1247    * @param regionName region to compact
1248    * @param columnFamily column family within a table or region
1249    * @param major True if we are to do a major compaction.
1250    * @throws IOException if a remote or network exception occurs
1251    * @throws InterruptedException
1252    */
1253   private void compactRegion(final byte[] regionName, final byte[] columnFamily,final boolean major)
1254   throws IOException {
1255     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
1256     if (regionServerPair == null) {
1257       throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1258     }
1259     if (regionServerPair.getSecond() == null) {
1260       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1261     }
1262     compact(regionServerPair.getSecond(), regionServerPair.getFirst(), major, columnFamily);
1263   }
1264
1265   private void compact(final ServerName sn, final HRegionInfo hri,
1266       final boolean major, final byte [] family)
1267   throws IOException {
1268     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1269     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1270     CompactRegionRequest request =
1271       RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
1272     try {
1273       // TODO: this does not do retries, it should. Set priority and timeout in controller
1274       admin.compactRegion(controller, request);
1275     } catch (ServiceException se) {
1276       throw ProtobufUtil.getRemoteException(se);
1277     }
1278   }
1279
1280   @Override
1281   public void move(final byte [] encodedRegionName, final byte [] destServerName)
1282       throws IOException {
1283
1284     executeCallable(new MasterCallable<Void>(getConnection()) {
1285       @Override
1286       public Void call(int callTimeout) throws ServiceException {
1287         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1288         controller.setCallTimeout(callTimeout);
1289         // Hard to know the table name, at least check if meta
1290         if (isMetaRegion(encodedRegionName)) {
1291           controller.setPriority(TableName.META_TABLE_NAME);
1292         }
1293
1294         try {
1295           MoveRegionRequest request =
1296               RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
1297             master.moveRegion(controller, request);
1298         } catch (DeserializationException de) {
1299           LOG.error("Could not parse destination server name: " + de);
1300           throw new ServiceException(new DoNotRetryIOException(de));
1301         }
1302         return null;
1303       }
1304     });
1305   }
1306
1307   private boolean isMetaRegion(final byte[] regionName) {
1308     return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
1309         || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
1310   }
1311
1312   @Override
1313   public void assign(final byte[] regionName) throws MasterNotRunningException,
1314       ZooKeeperConnectionException, IOException {
1315     final byte[] toBeAssigned = getRegionName(regionName);
1316     executeCallable(new MasterCallable<Void>(getConnection()) {
1317       @Override
1318       public Void call(int callTimeout) throws ServiceException {
1319         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1320         controller.setCallTimeout(callTimeout);
1321         // Hard to know the table name, at least check if meta
1322         if (isMetaRegion(regionName)) {
1323           controller.setPriority(TableName.META_TABLE_NAME);
1324         }
1325
1326         AssignRegionRequest request =
1327           RequestConverter.buildAssignRegionRequest(toBeAssigned);
1328         master.assignRegion(controller,request);
1329         return null;
1330       }
1331     });
1332   }
1333
1334   @Override
1335   public void unassign(final byte [] regionName, final boolean force)
1336   throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
1337     final byte[] toBeUnassigned = getRegionName(regionName);
1338     executeCallable(new MasterCallable<Void>(getConnection()) {
1339       @Override
1340       public Void call(int callTimeout) throws ServiceException {
1341         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1342         controller.setCallTimeout(callTimeout);
1343         // Hard to know the table name, at least check if meta
1344         if (isMetaRegion(regionName)) {
1345           controller.setPriority(TableName.META_TABLE_NAME);
1346         }
1347         UnassignRegionRequest request =
1348           RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
1349         master.unassignRegion(controller, request);
1350         return null;
1351       }
1352     });
1353   }
1354
1355   @Override
1356   public void offline(final byte [] regionName)
1357   throws IOException {
1358     executeCallable(new MasterCallable<Void>(getConnection()) {
1359       @Override
1360       public Void call(int callTimeout) throws ServiceException {
1361         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1362         controller.setCallTimeout(callTimeout);
1363         // Hard to know the table name, at least check if meta
1364         if (isMetaRegion(regionName)) {
1365           controller.setPriority(TableName.META_TABLE_NAME);
1366         }
1367         master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName));
1368         return null;
1369       }
1370     });
1371   }
1372
1373   @Override
1374   public boolean setBalancerRunning(final boolean on, final boolean synchronous)
1375   throws IOException {
1376     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1377       @Override
1378       public Boolean call(int callTimeout) throws ServiceException {
1379         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1380         controller.setCallTimeout(callTimeout);
1381
1382         SetBalancerRunningRequest req =
1383             RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
1384         return master.setBalancerRunning(controller, req).getPrevBalanceValue();
1385       }
1386     });
1387   }
1388
1389   @Override
1390   public boolean balancer() throws IOException {
1391     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1392       @Override
1393       public Boolean call(int callTimeout) throws ServiceException {
1394         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1395         controller.setCallTimeout(callTimeout);
1396
1397         return master.balance(controller,
1398           RequestConverter.buildBalanceRequest(false)).getBalancerRan();
1399       }
1400     });
1401   }
1402
1403   @Override
1404   public boolean balancer(final boolean force) throws IOException {
1405     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1406       @Override
1407       public Boolean call(int callTimeout) throws ServiceException {
1408         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1409         controller.setCallTimeout(callTimeout);
1410
1411         return master.balance(controller,
1412           RequestConverter.buildBalanceRequest(force)).getBalancerRan();
1413       }
1414     });
1415   }
1416
1417   @Override
1418   public boolean isBalancerEnabled() throws IOException {
1419     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1420       @Override
1421       public Boolean call(int callTimeout) throws ServiceException {
1422         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1423         controller.setCallTimeout(callTimeout);
1424
1425         return master.isBalancerEnabled(controller,
1426           RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
1427       }
1428     });
1429   }
1430
1431   @Override
1432   public boolean normalize() throws IOException {
1433     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1434       @Override
1435       public Boolean call(int callTimeout) throws ServiceException {
1436         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1437         controller.setCallTimeout(callTimeout);
1438
1439         return master.normalize(controller,
1440           RequestConverter.buildNormalizeRequest()).getNormalizerRan();
1441       }
1442     });
1443   }
1444
1445   @Override
1446   public boolean isNormalizerEnabled() throws IOException {
1447     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1448       @Override
1449       public Boolean call(int callTimeout) throws ServiceException {
1450         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1451         controller.setCallTimeout(callTimeout);
1452
1453         return master.isNormalizerEnabled(controller,
1454           RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
1455       }
1456     });
1457   }
1458
1459   @Override
1460   public boolean setNormalizerRunning(final boolean on) throws IOException {
1461     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1462       @Override
1463       public Boolean call(int callTimeout) throws ServiceException {
1464         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1465         controller.setCallTimeout(callTimeout);
1466
1467         SetNormalizerRunningRequest req =
1468           RequestConverter.buildSetNormalizerRunningRequest(on);
1469         return master.setNormalizerRunning(controller, req).getPrevNormalizerValue();
1470       }
1471     });
1472   }
1473
1474   @Override
1475   public boolean enableCatalogJanitor(final boolean enable) throws IOException {
1476     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1477       @Override
1478       public Boolean call(int callTimeout) throws ServiceException {
1479         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1480         controller.setCallTimeout(callTimeout);
1481
1482         return master.enableCatalogJanitor(controller,
1483           RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
1484       }
1485     });
1486   }
1487
1488   @Override
1489   public int runCatalogScan() throws IOException {
1490     return executeCallable(new MasterCallable<Integer>(getConnection()) {
1491       @Override
1492       public Integer call(int callTimeout) throws ServiceException {
1493         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1494         controller.setCallTimeout(callTimeout);
1495
1496         return master.runCatalogScan(controller,
1497           RequestConverter.buildCatalogScanRequest()).getScanResult();
1498       }
1499     });
1500   }
1501
1502   @Override
1503   public boolean isCatalogJanitorEnabled() throws IOException {
1504     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1505       @Override
1506       public Boolean call(int callTimeout) throws ServiceException {
1507         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1508         controller.setCallTimeout(callTimeout);
1509
1510         return master.isCatalogJanitorEnabled(controller,
1511           RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
1512       }
1513     });
1514   }
1515
1516   private boolean isEncodedRegionName(byte[] regionName) throws IOException {
1517     try {
1518       HRegionInfo.parseRegionName(regionName);
1519       return false;
1520     } catch (IOException e) {
1521       if (StringUtils.stringifyException(e)
1522         .contains(HRegionInfo.INVALID_REGION_NAME_FORMAT_MESSAGE)) {
1523         return true;
1524       }
1525       throw e;
1526     }
1527   }
1528
1529   /**
1530    * Merge two regions. Asynchronous operation.
1531    * @param nameOfRegionA encoded or full name of region a
1532    * @param nameOfRegionB encoded or full name of region b
1533    * @param forcible true if do a compulsory merge, otherwise we will only merge
1534    *          two adjacent regions
1535    * @throws IOException
1536    */
1537   @Override
1538   public void mergeRegions(final byte[] nameOfRegionA,
1539       final byte[] nameOfRegionB, final boolean forcible)
1540       throws IOException {
1541     final byte[] encodedNameOfRegionA = isEncodedRegionName(nameOfRegionA) ?
1542       nameOfRegionA : HRegionInfo.encodeRegionName(nameOfRegionA).getBytes();
1543     final byte[] encodedNameOfRegionB = isEncodedRegionName(nameOfRegionB) ?
1544       nameOfRegionB : HRegionInfo.encodeRegionName(nameOfRegionB).getBytes();
1545
1546     Pair<HRegionInfo, ServerName> pair = getRegion(nameOfRegionA);
1547     if (pair != null && pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
1548       throw new IllegalArgumentException("Can't invoke merge on non-default regions directly");
1549     pair = getRegion(nameOfRegionB);
1550     if (pair != null && pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
1551       throw new IllegalArgumentException("Can't invoke merge on non-default regions directly");
1552     executeCallable(new MasterCallable<Void>(getConnection()) {
1553       @Override
1554       public Void call(int callTimeout) throws ServiceException {
1555         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1556         controller.setCallTimeout(callTimeout);
1557
1558         try {
1559           DispatchMergingRegionsRequest request = RequestConverter
1560               .buildDispatchMergingRegionsRequest(encodedNameOfRegionA,
1561                 encodedNameOfRegionB, forcible);
1562           master.dispatchMergingRegions(controller, request);
1563         } catch (DeserializationException de) {
1564           LOG.error("Could not parse destination server name: " + de);
1565         }
1566         return null;
1567       }
1568     });
1569   }
1570
1571   @Override
1572   public void split(final TableName tableName) throws IOException {
1573     split(tableName, null);
1574   }
1575
1576   @Override
1577   public void splitRegion(final byte[] regionName) throws IOException {
1578     splitRegion(regionName, null);
1579   }
1580
1581   /**
1582    * {@inheritDoc}
1583    */
1584   @Override
1585   public void split(final TableName tableName, final byte [] splitPoint) throws IOException {
1586     ZooKeeperWatcher zookeeper = null;
1587     try {
1588       checkTableExists(tableName);
1589       zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
1590         new ThrowableAbortable());
1591       List<Pair<HRegionInfo, ServerName>> pairs;
1592       if (TableName.META_TABLE_NAME.equals(tableName)) {
1593         pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
1594       } else {
1595         pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
1596       }
1597       for (Pair<HRegionInfo, ServerName> pair: pairs) {
1598         // May not be a server for a particular row
1599         if (pair.getSecond() == null) continue;
1600         HRegionInfo r = pair.getFirst();
1601         // check for parents
1602         if (r.isSplitParent()) continue;
1603         // if a split point given, only split that particular region
1604         if (r.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1605            (splitPoint != null && !r.containsRow(splitPoint))) continue;
1606         // call out to region server to do split now
1607         split(pair.getSecond(), pair.getFirst(), splitPoint);
1608       }
1609     } finally {
1610       if (zookeeper != null) {
1611         zookeeper.close();
1612       }
1613     }
1614   }
1615
1616   @Override
1617   public void splitRegion(final byte[] regionName, final byte [] splitPoint) throws IOException {
1618     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
1619     if (regionServerPair == null) {
1620       throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1621     }
1622     if (regionServerPair.getFirst() != null &&
1623         regionServerPair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1624       throw new IllegalArgumentException("Can't split replicas directly. "
1625           + "Replicas are auto-split when their primary is split.");
1626     }
1627     if (regionServerPair.getSecond() == null) {
1628       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1629     }
1630     split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
1631   }
1632
1633   @VisibleForTesting
1634   public void split(final ServerName sn, final HRegionInfo hri,
1635       byte[] splitPoint) throws IOException {
1636     if (hri.getStartKey() != null && splitPoint != null &&
1637          Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
1638        throw new IOException("should not give a splitkey which equals to startkey!");
1639     }
1640     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1641     controller.setPriority(hri.getTable());
1642
1643     // TODO: this does not do retries, it should. Set priority and timeout in controller
1644     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1645     ProtobufUtil.split(controller, admin, hri, splitPoint);
1646   }
1647
1648   @Override
1649   public Future<Void> modifyTable(final TableName tableName, final HTableDescriptor htd)
1650       throws IOException {
1651     if (!tableName.equals(htd.getTableName())) {
1652       throw new IllegalArgumentException("the specified table name '" + tableName +
1653         "' doesn't match with the HTD one: " + htd.getTableName());
1654     }
1655
1656     ModifyTableResponse response = executeCallable(
1657       new MasterCallable<ModifyTableResponse>(getConnection()) {
1658         @Override
1659         public ModifyTableResponse call(int callTimeout) throws ServiceException {
1660           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1661           controller.setCallTimeout(callTimeout);
1662           controller.setPriority(tableName);
1663
1664           ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
1665             tableName, htd, ng.getNonceGroup(), ng.newNonce());
1666           return master.modifyTable(controller, request);
1667         }
1668       });
1669
1670     return new ModifyTableFuture(this, tableName, response);
1671   }
1672
1673   private static class ModifyTableFuture extends TableFuture<Void> {
1674     public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName,
1675         final ModifyTableResponse response) {
1676       super(admin, tableName,
1677           (response != null && response.hasProcId()) ? response.getProcId() : null);
1678     }
1679
1680     public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
1681       super(admin, tableName, procId);
1682     }
1683
1684     @Override
1685     public String getOperationType() {
1686       return "MODIFY";
1687     }
1688
1689     @Override
1690     protected Void postOperationResult(final Void result, final long deadlineTs)
1691         throws IOException, TimeoutException {
1692       // The modify operation on the table is asynchronous on the server side irrespective
1693       // of whether Procedure V2 is supported or not. So, we wait in the client till
1694       // all regions get updated.
1695       waitForSchemaUpdate(deadlineTs);
1696       return result;
1697     }
1698   }
1699
1700   /**
1701    * @param regionName Name of a region.
1702    * @return a pair of HRegionInfo and ServerName if <code>regionName</code> is
1703    *  a verified region name (we call {@link
1704    *  MetaTableAccessor#getRegionLocation(Connection, byte[])}
1705    *  else null.
1706    * Throw IllegalArgumentException if <code>regionName</code> is null.
1707    * @throws IOException
1708    */
1709   Pair<HRegionInfo, ServerName> getRegion(final byte[] regionName) throws IOException {
1710     if (regionName == null) {
1711       throw new IllegalArgumentException("Pass a table name or region name");
1712     }
1713     Pair<HRegionInfo, ServerName> pair =
1714       MetaTableAccessor.getRegion(connection, regionName);
1715     if (pair == null) {
1716       final AtomicReference<Pair<HRegionInfo, ServerName>> result =
1717         new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
1718       final String encodedName = Bytes.toString(regionName);
1719       MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
1720         @Override
1721         public boolean visit(Result data) throws IOException {
1722           HRegionInfo info = MetaTableAccessor.getHRegionInfo(data);
1723           if (info == null) {
1724             LOG.warn("No serialized HRegionInfo in " + data);
1725             return true;
1726           }
1727           RegionLocations rl = MetaTableAccessor.getRegionLocations(data);
1728           boolean matched = false;
1729           ServerName sn = null;
1730           if (rl != null) {
1731             for (HRegionLocation h : rl.getRegionLocations()) {
1732               if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) {
1733                 sn = h.getServerName();
1734                 info = h.getRegionInfo();
1735                 matched = true;
1736               }
1737             }
1738           }
1739           if (!matched) return true;
1740           result.set(new Pair<HRegionInfo, ServerName>(info, sn));
1741           return false; // found the region, stop
1742         }
1743       };
1744
1745       MetaTableAccessor.fullScanRegions(connection, visitor);
1746       pair = result.get();
1747     }
1748     return pair;
1749   }
1750
1751   /**
1752    * If the input is a region name, it is returned as is. If it's an
1753    * encoded region name, the corresponding region is found from meta
1754    * and its region name is returned. If we can't find any region in
1755    * meta matching the input as either region name or encoded region
1756    * name, the input is returned as is. We don't throw unknown
1757    * region exception.
1758    */
1759   private byte[] getRegionName(
1760       final byte[] regionNameOrEncodedRegionName) throws IOException {
1761     if (Bytes.equals(regionNameOrEncodedRegionName,
1762         HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
1763           || Bytes.equals(regionNameOrEncodedRegionName,
1764             HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
1765       return HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
1766     }
1767     byte[] tmp = regionNameOrEncodedRegionName;
1768     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionNameOrEncodedRegionName);
1769     if (regionServerPair != null && regionServerPair.getFirst() != null) {
1770       tmp = regionServerPair.getFirst().getRegionName();
1771     }
1772     return tmp;
1773   }
1774
1775   /**
1776    * Check if table exists or not
1777    * @param tableName Name of a table.
1778    * @return tableName instance
1779    * @throws IOException if a remote or network exception occurs.
1780    * @throws TableNotFoundException if table does not exist.
1781    */
1782   private TableName checkTableExists(final TableName tableName)
1783       throws IOException {
1784     return executeCallable(new ConnectionCallable<TableName>(getConnection()) {
1785       @Override
1786       public TableName call(int callTimeout) throws ServiceException, IOException {
1787         if (!MetaTableAccessor.tableExists(connection, tableName)) {
1788           throw new TableNotFoundException(tableName);
1789         }
1790         return tableName;
1791       }
1792     });
1793   }
1794
1795   @Override
1796   public synchronized void shutdown() throws IOException {
1797     executeCallable(new MasterCallable<Void>(getConnection()) {
1798       @Override
1799       public Void call(int callTimeout) throws ServiceException {
1800         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1801         controller.setCallTimeout(callTimeout);
1802         controller.setPriority(HConstants.HIGH_QOS);
1803         master.shutdown(controller, ShutdownRequest.newBuilder().build());
1804         return null;
1805       }
1806     });
1807   }
1808
1809   @Override
1810   public synchronized void stopMaster() throws IOException {
1811     executeCallable(new MasterCallable<Void>(getConnection()) {
1812       @Override
1813       public Void call(int callTimeout) throws ServiceException {
1814         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1815         controller.setCallTimeout(callTimeout);
1816         controller.setPriority(HConstants.HIGH_QOS);
1817         master.stopMaster(controller, StopMasterRequest.newBuilder().build());
1818         return null;
1819       }
1820     });
1821   }
1822
1823   @Override
1824   public synchronized void stopRegionServer(final String hostnamePort)
1825   throws IOException {
1826     String hostname = Addressing.parseHostname(hostnamePort);
1827     int port = Addressing.parsePort(hostnamePort);
1828     AdminService.BlockingInterface admin =
1829       this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
1830     StopServerRequest request = RequestConverter.buildStopServerRequest(
1831       "Called by admin client " + this.connection.toString());
1832     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1833
1834     controller.setPriority(HConstants.HIGH_QOS);
1835     try {
1836       // TODO: this does not do retries, it should. Set priority and timeout in controller
1837       admin.stopServer(controller, request);
1838     } catch (ServiceException se) {
1839       throw ProtobufUtil.getRemoteException(se);
1840     }
1841   }
1842
1843   @Override
1844   public ClusterStatus getClusterStatus() throws IOException {
1845     return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
1846       @Override
1847       public ClusterStatus call(int callTimeout) throws ServiceException {
1848         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1849         controller.setCallTimeout(callTimeout);
1850         GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
1851         return ProtobufUtil.convert(master.getClusterStatus(controller, req).getClusterStatus());
1852       }
1853     });
1854   }
1855
1856   @Override
1857   public Configuration getConfiguration() {
1858     return this.conf;
1859   }
1860
1861   /**
1862    * Do a get with a timeout against the passed in <code>future<code>.
1863    */
1864   private static <T> T get(final Future<T> future, final long timeout, final TimeUnit units)
1865   throws IOException {
1866     try {
1867       // TODO: how long should we wait? Spin forever?
1868       return future.get(timeout, units);
1869     } catch (InterruptedException e) {
1870       throw new InterruptedIOException("Interrupt while waiting on " + future);
1871     } catch (TimeoutException e) {
1872       throw new TimeoutIOException(e);
1873     } catch (ExecutionException e) {
1874       if (e.getCause() instanceof IOException) {
1875         throw (IOException)e.getCause();
1876       } else {
1877         throw new IOException(e.getCause());
1878       }
1879     }
1880   }
1881
1882   @Override
1883   public void createNamespace(final NamespaceDescriptor descriptor)
1884   throws IOException {
1885     get(createNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
1886   }
1887
1888   @Override
1889   public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
1890       throws IOException {
1891     CreateNamespaceResponse response =
1892         executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection()) {
1893           @Override
1894           public CreateNamespaceResponse call(int callTimeout) throws Exception {
1895             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1896             controller.setCallTimeout(callTimeout);
1897             // TODO: set priority based on NS?
1898             return master.createNamespace(controller,
1899               CreateNamespaceRequest.newBuilder()
1900               .setNamespaceDescriptor(ProtobufUtil
1901                 .toProtoNamespaceDescriptor(descriptor)).build()
1902                 );
1903           }
1904         });
1905     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
1906       @Override
1907       public String getOperationType() {
1908         return "CREATE_NAMESPACE";
1909       }
1910     };
1911   }
1912
1913   @Override
1914   public void modifyNamespace(final NamespaceDescriptor descriptor)
1915   throws IOException {
1916     get(modifyNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
1917   }
1918
1919   @Override
1920   public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
1921       throws IOException {
1922     ModifyNamespaceResponse response =
1923         executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection()) {
1924           @Override
1925           public ModifyNamespaceResponse call(int callTimeout) throws Exception {
1926             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1927             controller.setCallTimeout(callTimeout);
1928             // TODO: set priority based on NS?
1929             return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder().
1930               setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
1931           }
1932         });
1933     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
1934       @Override
1935       public String getOperationType() {
1936         return "MODIFY_NAMESPACE";
1937       }
1938     };
1939   }
1940
1941   @Override
1942   public void deleteNamespace(final String name)
1943   throws IOException {
1944     get(deleteNamespaceAsync(name), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
1945   }
1946
1947   @Override
1948   public Future<Void> deleteNamespaceAsync(final String name)
1949       throws IOException {
1950     DeleteNamespaceResponse response =
1951         executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection()) {
1952           @Override
1953           public DeleteNamespaceResponse call(int callTimeout) throws Exception {
1954             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1955             controller.setCallTimeout(callTimeout);
1956             // TODO: set priority based on NS?
1957             return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder().
1958               setNamespaceName(name).build());
1959           }
1960         });
1961     return new NamespaceFuture(this, name, response.getProcId()) {
1962       @Override
1963       public String getOperationType() {
1964         return "DELETE_NAMESPACE";
1965       }
1966     };
1967   }
1968
1969   @Override
1970   public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException {
1971     return
1972         executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
1973           @Override
1974           public NamespaceDescriptor call(int callTimeout) throws Exception {
1975             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1976             controller.setCallTimeout(callTimeout);
1977             return ProtobufUtil.toNamespaceDescriptor(
1978               master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder().
1979                 setNamespaceName(name).build()).getNamespaceDescriptor());
1980           }
1981         });
1982   }
1983
1984   @Override
1985   public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
1986     return
1987         executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
1988           @Override
1989           public NamespaceDescriptor[] call(int callTimeout) throws Exception {
1990             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1991             controller.setCallTimeout(callTimeout);
1992             List<HBaseProtos.NamespaceDescriptor> list =
1993                 master.listNamespaceDescriptors(controller,
1994                   ListNamespaceDescriptorsRequest.newBuilder().build())
1995                 .getNamespaceDescriptorList();
1996             NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
1997             for(int i = 0; i < list.size(); i++) {
1998               res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
1999             }
2000             return res;
2001           }
2002         });
2003   }
2004
2005   @Override
2006   public ProcedureInfo[] listProcedures() throws IOException {
2007     return
2008         executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) {
2009           @Override
2010           public ProcedureInfo[] call(int callTimeout) throws Exception {
2011             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2012             controller.setCallTimeout(callTimeout);
2013             List<ProcedureProtos.Procedure> procList = master.listProcedures(
2014               controller, ListProceduresRequest.newBuilder().build()).getProcedureList();
2015             ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
2016             for (int i = 0; i < procList.size(); i++) {
2017               procInfoList[i] = ProcedureUtil.convert(procList.get(i));
2018             }
2019             return procInfoList;
2020           }
2021         });
2022   }
2023
2024   @Override
2025   public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
2026     return
2027         executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
2028           @Override
2029           public HTableDescriptor[] call(int callTimeout) throws Exception {
2030             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2031             controller.setCallTimeout(callTimeout);
2032             List<TableSchema> list =
2033                 master.listTableDescriptorsByNamespace(controller,
2034                   ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
2035                   .build()).getTableSchemaList();
2036             HTableDescriptor[] res = new HTableDescriptor[list.size()];
2037             for(int i=0; i < list.size(); i++) {
2038
2039               res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
2040             }
2041             return res;
2042           }
2043         });
2044   }
2045
2046   @Override
2047   public TableName[] listTableNamesByNamespace(final String name) throws IOException {
2048     return
2049         executeCallable(new MasterCallable<TableName[]>(getConnection()) {
2050           @Override
2051           public TableName[] call(int callTimeout) throws Exception {
2052             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2053             controller.setCallTimeout(callTimeout);
2054             List<HBaseProtos.TableName> tableNames =
2055               master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest.
2056                 newBuilder().setNamespaceName(name).build())
2057                 .getTableNameList();
2058             TableName[] result = new TableName[tableNames.size()];
2059             for (int i = 0; i < tableNames.size(); i++) {
2060               result[i] = ProtobufUtil.toTableName(tableNames.get(i));
2061             }
2062             return result;
2063           }
2064         });
2065   }
2066
2067   /**
2068    * Check to see if HBase is running. Throw an exception if not.
2069    * @param conf system configuration
2070    * @throws MasterNotRunningException if the master is not running
2071    * @throws ZooKeeperConnectionException if unable to connect to zookeeper
2072    */
2073   // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
2074   public static void checkHBaseAvailable(Configuration conf)
2075   throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
2076     Configuration copyOfConf = HBaseConfiguration.create(conf);
2077     // We set it to make it fail as soon as possible if HBase is not available
2078     copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
2079     copyOfConf.setInt("zookeeper.recovery.retry", 0);
2080     try (ClusterConnection connection =
2081         (ClusterConnection)ConnectionFactory.createConnection(copyOfConf)) {
2082         // Check ZK first.
2083         // If the connection exists, we may have a connection to ZK that does not work anymore
2084         ZooKeeperKeepAliveConnection zkw = null;
2085         try {
2086           // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
2087           zkw = ((ConnectionImplementation)connection).
2088             getKeepAliveZooKeeperWatcher();
2089           zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
2090         } catch (IOException e) {
2091           throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
2092         } catch (InterruptedException e) {
2093           throw (InterruptedIOException)
2094             new InterruptedIOException("Can't connect to ZooKeeper").initCause(e);
2095         } catch (KeeperException e) {
2096           throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
2097         } finally {
2098           if (zkw != null) {
2099             zkw.close();
2100           }
2101         }
2102       connection.isMasterRunning();
2103     }
2104   }
2105
2106   @Override
2107   public List<HRegionInfo> getTableRegions(final TableName tableName)
2108   throws IOException {
2109     ZooKeeperWatcher zookeeper =
2110       new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
2111         new ThrowableAbortable());
2112     List<HRegionInfo> regions = null;
2113     try {
2114       if (TableName.META_TABLE_NAME.equals(tableName)) {
2115         regions = new MetaTableLocator().getMetaRegions(zookeeper);
2116       } else {
2117         regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
2118       }
2119     } finally {
2120       zookeeper.close();
2121     }
2122     return regions;
2123   }
2124
2125   @Override
2126   public synchronized void close() throws IOException {
2127   }
2128
2129   @Override
2130   public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames)
2131   throws IOException {
2132     return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
2133       @Override
2134       public HTableDescriptor[] call(int callTimeout) throws Exception {
2135         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2136         controller.setCallTimeout(callTimeout);
2137         GetTableDescriptorsRequest req =
2138             RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2139           return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
2140       }
2141     });
2142   }
2143
2144   /**
2145    * Get tableDescriptor
2146    * @param tableName one table name
2147    * @return HTD the HTableDescriptor or null if the table not exists
2148    * @throws IOException if a remote or network exception occurs
2149    */
2150   private HTableDescriptor getTableDescriptorByTableName(TableName tableName)
2151       throws IOException {
2152     List<TableName> tableNames = new ArrayList<TableName>(1);
2153     tableNames.add(tableName);
2154
2155     HTableDescriptor[] htdl = getTableDescriptorsByTableName(tableNames);
2156
2157     if (htdl == null || htdl.length == 0) {
2158       return null;
2159     }
2160     else {
2161       return htdl[0];
2162     }
2163   }
2164
2165   @Override
2166   public HTableDescriptor[] getTableDescriptors(List<String> names)
2167   throws IOException {
2168     List<TableName> tableNames = new ArrayList<TableName>(names.size());
2169     for(String name : names) {
2170       tableNames.add(TableName.valueOf(name));
2171     }
2172     return getTableDescriptorsByTableName(tableNames);
2173   }
2174
2175   private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
2176       FailedLogCloseException {
2177     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2178     RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
2179     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2180
2181     try {
2182       // TODO: this does not do retries, it should. Set priority and timeout in controller
2183       return admin.rollWALWriter(controller, request);
2184     } catch (ServiceException se) {
2185       throw ProtobufUtil.getRemoteException(se);
2186     }
2187   }
2188
2189   /**
2190    * Roll the log writer. I.e. when using a file system based write ahead log,
2191    * start writing log messages to a new file.
2192    *
2193    * Note that when talking to a version 1.0+ HBase deployment, the rolling is asynchronous.
2194    * This method will return as soon as the roll is requested and the return value will
2195    * always be null. Additionally, the named region server may schedule store flushes at the
2196    * request of the wal handling the roll request.
2197    *
2198    * When talking to a 0.98 or older HBase deployment, the rolling is synchronous and the
2199    * return value may be either null or a list of encoded region names.
2200    *
2201    * @param serverName
2202    *          The servername of the regionserver. A server name is made of host,
2203    *          port and startcode. This is mandatory. Here is an example:
2204    *          <code> host187.example.com,60020,1289493121758</code>
2205    * @return a set of {@link HRegionInfo#getEncodedName()} that would allow the wal to
2206    *         clean up some underlying files. null if there's nothing to flush.
2207    * @throws IOException if a remote or network exception occurs
2208    * @throws FailedLogCloseException
2209    * @deprecated use {@link #rollWALWriter(ServerName)}
2210    */
2211   @Deprecated
2212   public synchronized byte[][] rollHLogWriter(String serverName)
2213       throws IOException, FailedLogCloseException {
2214     ServerName sn = ServerName.valueOf(serverName);
2215     final RollWALWriterResponse response = rollWALWriterImpl(sn);
2216     int regionCount = response.getRegionToFlushCount();
2217     if (0 == regionCount) {
2218       return null;
2219     }
2220     byte[][] regionsToFlush = new byte[regionCount][];
2221     for (int i = 0; i < regionCount; i++) {
2222       ByteString region = response.getRegionToFlush(i);
2223       regionsToFlush[i] = region.toByteArray();
2224     }
2225     return regionsToFlush;
2226   }
2227
2228   @Override
2229   public synchronized void rollWALWriter(ServerName serverName)
2230       throws IOException, FailedLogCloseException {
2231     rollWALWriterImpl(serverName);
2232   }
2233
2234   @Override
2235   public String[] getMasterCoprocessors() {
2236     try {
2237       return getClusterStatus().getMasterCoprocessors();
2238     } catch (IOException e) {
2239       LOG.error("Could not getClusterStatus()",e);
2240       return null;
2241     }
2242   }
2243
2244   @Override
2245   public CompactionState getCompactionState(final TableName tableName)
2246   throws IOException {
2247     return getCompactionState(tableName, CompactType.NORMAL);
2248   }
2249
2250   @Override
2251   public CompactionState getCompactionStateForRegion(final byte[] regionName)
2252   throws IOException {
2253     try {
2254       Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
2255       if (regionServerPair == null) {
2256         throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
2257       }
2258       if (regionServerPair.getSecond() == null) {
2259         throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
2260       }
2261       ServerName sn = regionServerPair.getSecond();
2262       AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2263       GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
2264         regionServerPair.getFirst().getRegionName(), true);
2265       PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2266       // TODO: this does not do retries, it should. Set priority and timeout in controller
2267       GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
2268       if (response.getCompactionState() != null) {
2269         return ProtobufUtil.createCompactionState(response.getCompactionState());
2270       }
2271       return null;
2272     } catch (ServiceException se) {
2273       throw ProtobufUtil.getRemoteException(se);
2274     }
2275   }
2276
2277   @Override
2278   public void snapshot(final String snapshotName,
2279                        final TableName tableName) throws IOException,
2280       SnapshotCreationException, IllegalArgumentException {
2281     snapshot(snapshotName, tableName, SnapshotType.FLUSH);
2282   }
2283
2284   @Override
2285   public void snapshot(final byte[] snapshotName, final TableName tableName)
2286       throws IOException, SnapshotCreationException, IllegalArgumentException {
2287     snapshot(Bytes.toString(snapshotName), tableName, SnapshotType.FLUSH);
2288   }
2289
2290   @Override
2291   public void snapshot(final String snapshotName, final TableName tableName,
2292       SnapshotType type)
2293       throws IOException, SnapshotCreationException, IllegalArgumentException {
2294     snapshot(new SnapshotDescription(snapshotName, tableName.getNameAsString(), type));
2295   }
2296
2297   @Override
2298   public void snapshot(SnapshotDescription snapshotDesc)
2299       throws IOException, SnapshotCreationException, IllegalArgumentException {
2300     // actually take the snapshot
2301     HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
2302     SnapshotResponse response = asyncSnapshot(snapshot);
2303     final IsSnapshotDoneRequest request =
2304         IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build();
2305     IsSnapshotDoneResponse done = null;
2306     long start = EnvironmentEdgeManager.currentTime();
2307     long max = response.getExpectedTimeout();
2308     long maxPauseTime = max / this.numRetries;
2309     int tries = 0;
2310     LOG.debug("Waiting a max of " + max + " ms for snapshot '" +
2311         ClientSnapshotDescriptionUtils.toString(snapshot) + "'' to complete. (max " +
2312         maxPauseTime + " ms per retry)");
2313     while (tries == 0
2314         || ((EnvironmentEdgeManager.currentTime() - start) < max && !done.getDone())) {
2315       try {
2316         // sleep a backoff <= pauseTime amount
2317         long sleep = getPauseTime(tries++);
2318         sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2319         LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2320           "ms while waiting for snapshot completion.");
2321         Thread.sleep(sleep);
2322       } catch (InterruptedException e) {
2323         throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2324       }
2325       LOG.debug("Getting current status of snapshot from master...");
2326       done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
2327         @Override
2328         public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
2329           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2330           controller.setCallTimeout(callTimeout);
2331           return master.isSnapshotDone(controller, request);
2332         }
2333       });
2334     }
2335     if (!done.getDone()) {
2336       throw new SnapshotCreationException("Snapshot '" + snapshot.getName()
2337           + "' wasn't completed in expectedTime:" + max + " ms", snapshotDesc);
2338     }
2339   }
2340
2341   @Override
2342   public void takeSnapshotAsync(SnapshotDescription snapshotDesc) throws IOException,
2343       SnapshotCreationException {
2344     HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
2345     asyncSnapshot(snapshot);
2346   }
2347
2348   private HBaseProtos.SnapshotDescription
2349       createHBaseProtosSnapshotDesc(SnapshotDescription snapshotDesc) {
2350     HBaseProtos.SnapshotDescription.Builder builder = HBaseProtos.SnapshotDescription.newBuilder();
2351     if (snapshotDesc.getTable() != null) {
2352       builder.setTable(snapshotDesc.getTable());
2353     }
2354     if (snapshotDesc.getName() != null) {
2355       builder.setName(snapshotDesc.getName());
2356     }
2357     if (snapshotDesc.getOwner() != null) {
2358       builder.setOwner(snapshotDesc.getOwner());
2359     }
2360     if (snapshotDesc.getCreationTime() != -1) {
2361       builder.setCreationTime(snapshotDesc.getCreationTime());
2362     }
2363     if (snapshotDesc.getVersion() != -1) {
2364       builder.setVersion(snapshotDesc.getVersion());
2365     }
2366     builder.setType(ProtobufUtil.createProtosSnapShotDescType(snapshotDesc.getType()));
2367     HBaseProtos.SnapshotDescription snapshot = builder.build();
2368     return snapshot;
2369   }
2370
2371   private SnapshotResponse asyncSnapshot(HBaseProtos.SnapshotDescription snapshot)
2372       throws IOException {
2373     ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2374     final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2375         .build();
2376     // run the snapshot on the master
2377     return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
2378       @Override
2379       public SnapshotResponse call(int callTimeout) throws ServiceException {
2380         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2381         controller.setCallTimeout(callTimeout);
2382         return master.snapshot(controller, request);
2383       }
2384     });
2385   }
2386
2387   @Override
2388   public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
2389       throws IOException, HBaseSnapshotException, UnknownSnapshotException {
2390     final HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
2391     return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
2392       @Override
2393       public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
2394         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2395         controller.setCallTimeout(callTimeout);
2396         return master.isSnapshotDone(controller,
2397           IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
2398       }
2399     }).getDone();
2400   }
2401
2402   @Override
2403   public void restoreSnapshot(final byte[] snapshotName)
2404       throws IOException, RestoreSnapshotException {
2405     restoreSnapshot(Bytes.toString(snapshotName));
2406   }
2407
2408   @Override
2409   public void restoreSnapshot(final String snapshotName)
2410       throws IOException, RestoreSnapshotException {
2411     boolean takeFailSafeSnapshot =
2412       conf.getBoolean("hbase.snapshot.restore.take.failsafe.snapshot", false);
2413     restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2414   }
2415
2416   @Override
2417   public void restoreSnapshot(final byte[] snapshotName, final boolean takeFailSafeSnapshot)
2418       throws IOException, RestoreSnapshotException {
2419     restoreSnapshot(Bytes.toString(snapshotName), takeFailSafeSnapshot);
2420   }
2421
2422   /*
2423    * Check whether the snapshot exists and contains disabled table
2424    *
2425    * @param snapshotName name of the snapshot to restore
2426    * @throws IOException if a remote or network exception occurs
2427    * @throws RestoreSnapshotException if no valid snapshot is found
2428    */
2429   private TableName getTableNameBeforeRestoreSnapshot(final String snapshotName)
2430       throws IOException, RestoreSnapshotException {
2431     TableName tableName = null;
2432     for (SnapshotDescription snapshotInfo: listSnapshots()) {
2433       if (snapshotInfo.getName().equals(snapshotName)) {
2434         tableName = TableName.valueOf(snapshotInfo.getTable());
2435         break;
2436       }
2437     }
2438
2439     if (tableName == null) {
2440       throw new RestoreSnapshotException(
2441         "Unable to find the table name for snapshot=" + snapshotName);
2442     }
2443     return tableName;
2444   }
2445
2446   @Override
2447   public void restoreSnapshot(final String snapshotName, final boolean takeFailSafeSnapshot)
2448       throws IOException, RestoreSnapshotException {
2449     TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2450
2451     // The table does not exists, switch to clone.
2452     if (!tableExists(tableName)) {
2453       cloneSnapshot(snapshotName, tableName);
2454       return;
2455     }
2456
2457     // Check if the table is disabled
2458     if (!isTableDisabled(tableName)) {
2459       throw new TableNotDisabledException(tableName);
2460     }
2461
2462     // Take a snapshot of the current state
2463     String failSafeSnapshotSnapshotName = null;
2464     if (takeFailSafeSnapshot) {
2465       failSafeSnapshotSnapshotName = conf.get("hbase.snapshot.restore.failsafe.name",
2466         "hbase-failsafe-{snapshot.name}-{restore.timestamp}");
2467       failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotName
2468         .replace("{snapshot.name}", snapshotName)
2469         .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2470         .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2471       LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2472       snapshot(failSafeSnapshotSnapshotName, tableName);
2473     }
2474
2475     try {
2476       // Restore snapshot
2477       get(
2478         internalRestoreSnapshotAsync(snapshotName, tableName),
2479         syncWaitTimeout,
2480         TimeUnit.MILLISECONDS);
2481     } catch (IOException e) {
2482       // Somthing went wrong during the restore...
2483       // if the pre-restore snapshot is available try to rollback
2484       if (takeFailSafeSnapshot) {
2485         try {
2486           get(
2487             internalRestoreSnapshotAsync(failSafeSnapshotSnapshotName, tableName),
2488             syncWaitTimeout,
2489             TimeUnit.MILLISECONDS);
2490           String msg = "Restore snapshot=" + snapshotName +
2491             " failed. Rollback to snapshot=" + failSafeSnapshotSnapshotName + " succeeded.";
2492           LOG.error(msg, e);
2493           throw new RestoreSnapshotException(msg, e);
2494         } catch (IOException ex) {
2495           String msg = "Failed to restore and rollback to snapshot=" + failSafeSnapshotSnapshotName;
2496           LOG.error(msg, ex);
2497           throw new RestoreSnapshotException(msg, e);
2498         }
2499       } else {
2500         throw new RestoreSnapshotException("Failed to restore snapshot=" + snapshotName, e);
2501       }
2502     }
2503
2504     // If the restore is succeeded, delete the pre-restore snapshot
2505     if (takeFailSafeSnapshot) {
2506       try {
2507         LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2508         deleteSnapshot(failSafeSnapshotSnapshotName);
2509       } catch (IOException e) {
2510         LOG.error("Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, e);
2511       }
2512     }
2513   }
2514
2515   @Override
2516   public Future<Void> restoreSnapshotAsync(final String snapshotName)
2517       throws IOException, RestoreSnapshotException {
2518     TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2519
2520     // The table does not exists, switch to clone.
2521     if (!tableExists(tableName)) {
2522       return cloneSnapshotAsync(snapshotName, tableName);
2523     }
2524
2525     // Check if the table is disabled
2526     if (!isTableDisabled(tableName)) {
2527       throw new TableNotDisabledException(tableName);
2528     }
2529
2530     return internalRestoreSnapshotAsync(snapshotName, tableName);
2531   }
2532
2533   @Override
2534   public void cloneSnapshot(final byte[] snapshotName, final TableName tableName)
2535       throws IOException, TableExistsException, RestoreSnapshotException {
2536     cloneSnapshot(Bytes.toString(snapshotName), tableName);
2537   }
2538
2539   @Override
2540   public void cloneSnapshot(final String snapshotName, final TableName tableName)
2541       throws IOException, TableExistsException, RestoreSnapshotException {
2542     if (tableExists(tableName)) {
2543       throw new TableExistsException(tableName);
2544     }
2545     get(
2546       internalRestoreSnapshotAsync(snapshotName, tableName),
2547       Integer.MAX_VALUE,
2548       TimeUnit.MILLISECONDS);
2549   }
2550
2551   @Override
2552   public Future<Void> cloneSnapshotAsync(final String snapshotName, final TableName tableName)
2553       throws IOException, TableExistsException {
2554     if (tableExists(tableName)) {
2555       throw new TableExistsException(tableName);
2556     }
2557     return internalRestoreSnapshotAsync(snapshotName, tableName);
2558   }
2559
2560   @Override
2561   public byte[] execProcedureWithRet(String signature, String instance, Map<String, String> props)
2562       throws IOException {
2563     ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
2564     builder.setSignature(signature).setInstance(instance);
2565     for (Entry<String, String> entry : props.entrySet()) {
2566       NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
2567           .setValue(entry.getValue()).build();
2568       builder.addConfiguration(pair);
2569     }
2570
2571     final ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
2572         .setProcedure(builder.build()).build();
2573     // run the procedure on the master
2574     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
2575         getConnection()) {
2576       @Override
2577       public ExecProcedureResponse call(int callTimeout) throws ServiceException {
2578         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2579         controller.setCallTimeout(callTimeout);
2580         return master.execProcedureWithRet(controller, request);
2581       }
2582     });
2583
2584     return response.hasReturnData() ? response.getReturnData().toByteArray() : null;
2585   }
2586
2587   @Override
2588   public void execProcedure(String signature, String instance, Map<String, String> props)
2589       throws IOException {
2590     ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
2591     builder.setSignature(signature).setInstance(instance);
2592     for (Entry<String, String> entry : props.entrySet()) {
2593       NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
2594           .setValue(entry.getValue()).build();
2595       builder.addConfiguration(pair);
2596     }
2597
2598     final ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
2599         .setProcedure(builder.build()).build();
2600     // run the procedure on the master
2601     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
2602         getConnection()) {
2603       @Override
2604       public ExecProcedureResponse call(int callTimeout) throws ServiceException {
2605         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2606         controller.setCallTimeout(callTimeout);
2607         return master.execProcedure(controller, request);
2608       }
2609     });
2610
2611     long start = EnvironmentEdgeManager.currentTime();
2612     long max = response.getExpectedTimeout();
2613     long maxPauseTime = max / this.numRetries;
2614     int tries = 0;
2615     LOG.debug("Waiting a max of " + max + " ms for procedure '" +
2616         signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
2617     boolean done = false;
2618     while (tries == 0
2619         || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
2620       try {
2621         // sleep a backoff <= pauseTime amount
2622         long sleep = getPauseTime(tries++);
2623         sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2624         LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2625           "ms while waiting for procedure completion.");
2626         Thread.sleep(sleep);
2627       } catch (InterruptedException e) {
2628         throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2629       }
2630       LOG.debug("Getting current status of procedure from master...");
2631       done = isProcedureFinished(signature, instance, props);
2632     }
2633     if (!done) {
2634       throw new IOException("Procedure '" + signature + " : " + instance
2635           + "' wasn't completed in expectedTime:" + max + " ms");
2636     }
2637   }
2638
2639   @Override
2640   public boolean isProcedureFinished(String signature, String instance, Map<String, String> props)
2641       throws IOException {
2642     final ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
2643     builder.setSignature(signature).setInstance(instance);
2644     for (Entry<String, String> entry : props.entrySet()) {
2645       NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
2646           .setValue(entry.getValue()).build();
2647       builder.addConfiguration(pair);
2648     }
2649     final ProcedureDescription desc = builder.build();
2650     return executeCallable(
2651         new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
2652           @Override
2653           public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
2654             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2655             controller.setCallTimeout(callTimeout);
2656             return master.isProcedureDone(controller, IsProcedureDoneRequest
2657                 .newBuilder().setProcedure(desc).build());
2658           }
2659         }).getDone();
2660   }
2661
2662   /**
2663    * Execute Restore/Clone snapshot and wait for the server to complete (blocking).
2664    * To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to
2665    * create an HTable instance to this table before it is available.
2666    * @param snapshotName snapshot to restore
2667    * @param tableName table name to restore the snapshot on
2668    * @throws IOException if a remote or network exception occurs
2669    * @throws RestoreSnapshotException if snapshot failed to be restored
2670    * @throws IllegalArgumentException if the restore request is formatted incorrectly
2671    */
2672   private Future<Void> internalRestoreSnapshotAsync(
2673       final String snapshotName,
2674       final TableName tableName) throws IOException, RestoreSnapshotException {
2675     final HBaseProtos.SnapshotDescription snapshot = HBaseProtos.SnapshotDescription.newBuilder()
2676         .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2677
2678     // actually restore the snapshot
2679     ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2680
2681     RestoreSnapshotResponse response = executeCallable(
2682         new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
2683       @Override
2684       public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
2685         final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
2686             .setSnapshot(snapshot)
2687             .setNonceGroup(ng.getNonceGroup())
2688             .setNonce(ng.newNonce())
2689             .build();
2690         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2691         controller.setCallTimeout(callTimeout);
2692         return master.restoreSnapshot(controller, request);
2693       }
2694     });
2695
2696     return new RestoreSnapshotFuture(
2697       this, snapshot, TableName.valueOf(snapshot.getTable()), response);
2698   }
2699
2700   private static class RestoreSnapshotFuture extends TableFuture<Void> {
2701     public RestoreSnapshotFuture(
2702         final HBaseAdmin admin,
2703         final HBaseProtos.SnapshotDescription snapshot,
2704         final TableName tableName,
2705         final RestoreSnapshotResponse response) {
2706       super(admin, tableName,
2707           (response != null && response.hasProcId()) ? response.getProcId() : null);
2708
2709       if (response != null && !response.hasProcId()) {
2710         throw new UnsupportedOperationException("Client could not call old version of Server");
2711       }
2712     }
2713
2714     public RestoreSnapshotFuture(
2715         final HBaseAdmin admin,
2716         final TableName tableName,
2717         final Long procId) {
2718       super(admin, tableName, procId);
2719     }
2720
2721     @Override
2722     public String getOperationType() {
2723       return "MODIFY";
2724     }
2725   }
2726
2727   @Override
2728   public List<SnapshotDescription> listSnapshots() throws IOException {
2729     return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
2730       @Override
2731       public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
2732         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2733         controller.setCallTimeout(callTimeout);
2734         List<HBaseProtos.SnapshotDescription> snapshotsList = master
2735             .getCompletedSnapshots(controller, GetCompletedSnapshotsRequest.newBuilder().build())
2736             .getSnapshotsList();
2737         List<SnapshotDescription> result = new ArrayList<SnapshotDescription>(snapshotsList.size());
2738         for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) {
2739           result.add(new SnapshotDescription(snapshot.getName(), snapshot.getTable(),
2740               ProtobufUtil.createSnapshotType(snapshot.getType()), snapshot.getOwner(),
2741               snapshot.getCreationTime(), snapshot.getVersion()));
2742         }
2743         return result;
2744       }
2745     });
2746   }
2747
2748   @Override
2749   public List<SnapshotDescription> listSnapshots(String regex) throws IOException {
2750     return listSnapshots(Pattern.compile(regex));
2751   }
2752
2753   @Override
2754   public List<SnapshotDescription> listSnapshots(Pattern pattern) throws IOException {
2755     List<SnapshotDescription> matched = new LinkedList<SnapshotDescription>();
2756     List<SnapshotDescription> snapshots = listSnapshots();
2757     for (SnapshotDescription snapshot : snapshots) {
2758       if (pattern.matcher(snapshot.getName()).matches()) {
2759         matched.add(snapshot);
2760       }
2761     }
2762     return matched;
2763   }
2764
2765   @Override
2766   public List<SnapshotDescription> listTableSnapshots(String tableNameRegex,
2767       String snapshotNameRegex) throws IOException {
2768     return listTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
2769   }
2770
2771   @Override
2772   public List<SnapshotDescription> listTableSnapshots(Pattern tableNamePattern,
2773       Pattern snapshotNamePattern) throws IOException {
2774     TableName[] tableNames = listTableNames(tableNamePattern);
2775
2776     List<SnapshotDescription> tableSnapshots = new LinkedList<SnapshotDescription>();
2777     List<SnapshotDescription> snapshots = listSnapshots(snapshotNamePattern);
2778
2779     List<TableName> listOfTableNames = Arrays.asList(tableNames);
2780     for (SnapshotDescription snapshot : snapshots) {
2781       if (listOfTableNames.contains(TableName.valueOf(snapshot.getTable()))) {
2782         tableSnapshots.add(snapshot);
2783       }
2784     }
2785     return tableSnapshots;
2786   }
2787
2788   @Override
2789   public void deleteSnapshot(final byte[] snapshotName) throws IOException {
2790     deleteSnapshot(Bytes.toString(snapshotName));
2791   }
2792
2793   @Override
2794   public void deleteSnapshot(final String snapshotName) throws IOException {
2795     // make sure the snapshot is possibly valid
2796     TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
2797     // do the delete
2798     executeCallable(new MasterCallable<Void>(getConnection()) {
2799       @Override
2800       public Void call(int callTimeout) throws ServiceException {
2801         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2802         controller.setCallTimeout(callTimeout);
2803         master.deleteSnapshot(controller,
2804           DeleteSnapshotRequest.newBuilder().
2805               setSnapshot(
2806                 HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
2807               .build()
2808         );
2809         return null;
2810       }
2811     });
2812   }
2813
2814   @Override
2815   public void deleteSnapshots(final String regex) throws IOException {
2816     deleteSnapshots(Pattern.compile(regex));
2817   }
2818
2819   @Override
2820   public void deleteSnapshots(final Pattern pattern) throws IOException {
2821     List<SnapshotDescription> snapshots = listSnapshots(pattern);
2822     for (final SnapshotDescription snapshot : snapshots) {
2823       try {
2824         internalDeleteSnapshot(snapshot);
2825       } catch (IOException ex) {
2826         LOG.info(
2827           "Failed to delete snapshot " + snapshot.getName() + " for table " + snapshot.getTable(),
2828           ex);
2829       }
2830     }
2831   }
2832
2833   private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
2834     executeCallable(new MasterCallable<Void>(getConnection()) {
2835       @Override
2836       public Void call(int callTimeout) throws ServiceException {
2837         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2838         controller.setCallTimeout(callTimeout);
2839         this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder()
2840           .setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build());
2841         return null;
2842       }
2843     });
2844   }
2845
2846   @Override
2847   public void deleteTableSnapshots(String tableNameRegex, String snapshotNameRegex)
2848       throws IOException {
2849     deleteTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
2850   }
2851
2852   @Override
2853   public void deleteTableSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern)
2854       throws IOException {
2855     List<SnapshotDescription> snapshots = listTableSnapshots(tableNamePattern, snapshotNamePattern);
2856     for (SnapshotDescription snapshot : snapshots) {
2857       try {
2858         internalDeleteSnapshot(snapshot);
2859         LOG.debug("Successfully deleted snapshot: " + snapshot.getName());
2860       } catch (IOException e) {
2861         LOG.error("Failed to delete snapshot: " + snapshot.getName(), e);
2862       }
2863     }
2864   }
2865
2866   @Override
2867   public void setQuota(final QuotaSettings quota) throws IOException {
2868     executeCallable(new MasterCallable<Void>(getConnection()) {
2869       @Override
2870       public Void call(int callTimeout) throws ServiceException {
2871         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2872         controller.setCallTimeout(callTimeout);
2873         this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota));
2874         return null;
2875       }
2876     });
2877   }
2878
2879   @Override
2880   public QuotaRetriever getQuotaRetriever(final QuotaFilter filter) throws IOException {
2881     return QuotaRetriever.open(conf, filter);
2882   }
2883
2884   private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable)
2885       throws IOException {
2886     return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout);
2887   }
2888
2889   static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
2890              RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout,
2891       int rpcTimeout) throws IOException {
2892     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
2893     try {
2894       return caller.callWithRetries(callable, operationTimeout);
2895     } finally {
2896       callable.close();
2897     }
2898   }
2899
2900   @Override
2901   public CoprocessorRpcChannel coprocessorService() {
2902     return new MasterCoprocessorRpcChannel(connection);
2903   }
2904
2905   /**
2906    * Simple {@link Abortable}, throwing RuntimeException on abort.
2907    */
2908   private static class ThrowableAbortable implements Abortable {
2909
2910     @Override
2911     public void abort(String why, Throwable e) {
2912       throw new RuntimeException(why, e);
2913     }
2914
2915     @Override
2916     public boolean isAborted() {
2917       return true;
2918     }
2919   }
2920
2921   @Override
2922   public CoprocessorRpcChannel coprocessorService(ServerName sn) {
2923     return new RegionServerCoprocessorRpcChannel(connection, sn);
2924   }
2925
2926   @Override
2927   public void updateConfiguration(ServerName server) throws IOException {
2928     try {
2929       this.connection.getAdmin(server).updateConfiguration(null,
2930         UpdateConfigurationRequest.getDefaultInstance());
2931     } catch (ServiceException e) {
2932       throw ProtobufUtil.getRemoteException(e);
2933     }
2934   }
2935
2936   @Override
2937   public void updateConfiguration() throws IOException {
2938     for (ServerName server : this.getClusterStatus().getServers()) {
2939       updateConfiguration(server);
2940     }
2941   }
2942
2943   @Override
2944   public int getMasterInfoPort() throws IOException {
2945     // TODO: Fix!  Reaching into internal implementation!!!!
2946     ConnectionImplementation connection =
2947         (ConnectionImplementation)this.connection;
2948     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
2949     try {
2950       return MasterAddressTracker.getMasterInfoPort(zkw);
2951     } catch (KeeperException e) {
2952       throw new IOException("Failed to get master info port from MasterAddressTracker", e);
2953     }
2954   }
2955
2956   private ServerName getMasterAddress() throws IOException {
2957     // TODO: Fix!  Reaching into internal implementation!!!!
2958     ConnectionImplementation connection =
2959             (ConnectionImplementation)this.connection;
2960     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
2961     try {
2962       return MasterAddressTracker.getMasterAddress(zkw);
2963     } catch (KeeperException e) {
2964       throw new IOException("Failed to get master server name from MasterAddressTracker", e);
2965     }
2966   }
2967
2968   @Override
2969   public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
2970     return executeCallable(new MasterCallable<Long>(getConnection()) {
2971       @Override
2972       public Long call(int callTimeout) throws ServiceException {
2973         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2974         controller.setCallTimeout(callTimeout);
2975         MajorCompactionTimestampRequest req =
2976             MajorCompactionTimestampRequest.newBuilder()
2977                 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
2978         return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp();
2979       }
2980     });
2981   }
2982
2983   @Override
2984   public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
2985     return executeCallable(new MasterCallable<Long>(getConnection()) {
2986       @Override
2987       public Long call(int callTimeout) throws ServiceException {
2988         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2989         controller.setCallTimeout(callTimeout);
2990         MajorCompactionTimestampForRegionRequest req =
2991             MajorCompactionTimestampForRegionRequest
2992                 .newBuilder()
2993                 .setRegion(
2994                   RequestConverter
2995                       .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
2996         return master.getLastMajorCompactionTimestampForRegion(controller, req)
2997             .getCompactionTimestamp();
2998       }
2999     });
3000   }
3001
3002   /**
3003    * {@inheritDoc}
3004    */
3005   @Override
3006   public void compact(final TableName tableName, final byte[] columnFamily, CompactType compactType)
3007     throws IOException, InterruptedException {
3008     compact(tableName, columnFamily, false, compactType);
3009   }
3010
3011   /**
3012    * {@inheritDoc}
3013    */
3014   @Override
3015   public void compact(final TableName tableName, CompactType compactType)
3016     throws IOException, InterruptedException {
3017     compact(tableName, null, false, compactType);
3018   }
3019
3020   /**
3021    * {@inheritDoc}
3022    */
3023   @Override
3024   public void majorCompact(final TableName tableName, final byte[] columnFamily,
3025     CompactType compactType) throws IOException, InterruptedException {
3026     compact(tableName, columnFamily, true, compactType);
3027   }
3028
3029   /**
3030    * {@inheritDoc}
3031    */
3032   @Override
3033   public void majorCompact(final TableName tableName, CompactType compactType)
3034           throws IOException, InterruptedException {
3035       compact(tableName, null, true, compactType);
3036   }
3037
3038   /**
3039    * {@inheritDoc}
3040    */
3041   @Override
3042   public CompactionState getCompactionState(TableName tableName,
3043     CompactType compactType) throws IOException {
3044     AdminProtos.GetRegionInfoResponse.CompactionState state =
3045         AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
3046     checkTableExists(tableName);
3047     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
3048     switch (compactType) {
3049       case MOB:
3050         try {
3051           ServerName master = getMasterAddress();
3052           HRegionInfo info = getMobRegionInfo(tableName);
3053           GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
3054                   info.getRegionName(), true);
3055           GetRegionInfoResponse response = this.connection.getAdmin(master)
3056                   .getRegionInfo(controller, request);
3057           state = response.getCompactionState();
3058         } catch (ServiceException se) {
3059           throw ProtobufUtil.getRemoteException(se);
3060         }
3061         break;
3062       case NORMAL:
3063       default:
3064         ZooKeeperWatcher zookeeper = null;
3065         try {
3066           List<Pair<HRegionInfo, ServerName>> pairs;
3067           if (TableName.META_TABLE_NAME.equals(tableName)) {
3068             zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
3069               new ThrowableAbortable());
3070             pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
3071           } else {
3072             pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
3073           }
3074           for (Pair<HRegionInfo, ServerName> pair : pairs) {
3075             if (pair.getFirst().isOffline()) continue;
3076             if (pair.getSecond() == null) continue;
3077             try {
3078               ServerName sn = pair.getSecond();
3079               AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
3080               GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
3081                       pair.getFirst().getRegionName(), true);
3082               GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
3083               switch (response.getCompactionState()) {
3084                 case MAJOR_AND_MINOR:
3085                   return CompactionState.MAJOR_AND_MINOR;
3086                 case MAJOR:
3087                   if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MINOR) {
3088                     return CompactionState.MAJOR_AND_MINOR;
3089                   }
3090                   state = AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR;
3091                   break;
3092                 case MINOR:
3093                   if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) {
3094                     return CompactionState.MAJOR_AND_MINOR;
3095                   }
3096                   state = AdminProtos.GetRegionInfoResponse.CompactionState.MINOR;
3097                   break;
3098                 case NONE:
3099                 default: // nothing, continue
3100               }
3101             } catch (NotServingRegionException e) {
3102               if (LOG.isDebugEnabled()) {
3103                 LOG.debug("Trying to get compaction state of " +
3104                         pair.getFirst() + ": " +
3105                         StringUtils.stringifyException(e));
3106               }
3107             } catch (RemoteException e) {
3108               if (e.getMessage().indexOf(NotServingRegionException.class.getName()) >= 0) {
3109                 if (LOG.isDebugEnabled()) {
3110                   LOG.debug("Trying to get compaction state of " + pair.getFirst() + ": "
3111                           + StringUtils.stringifyException(e));
3112                 }
3113               } else {
3114                 throw e;
3115               }
3116             }
3117           }
3118         } catch (ServiceException se) {
3119           throw ProtobufUtil.getRemoteException(se);
3120         } finally {
3121           if (zookeeper != null) {
3122             zookeeper.close();
3123           }
3124         }
3125         break;
3126     }
3127     if(state != null) {
3128       return ProtobufUtil.createCompactionState(state);
3129     }
3130     return null;
3131   }
3132
3133   /**
3134    * Future that waits on a procedure result.
3135    * Returned by the async version of the Admin calls,
3136    * and used internally by the sync calls to wait on the result of the procedure.
3137    */
3138   @InterfaceAudience.Private
3139   @InterfaceStability.Evolving
3140   protected static class ProcedureFuture<V> implements Future<V> {
3141     private ExecutionException exception = null;
3142     private boolean procResultFound = false;
3143     private boolean done = false;
3144     private boolean cancelled = false;
3145     private V result = null;
3146
3147     private final HBaseAdmin admin;
3148     private final Long procId;
3149
3150     public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
3151       this.admin = admin;
3152       this.procId = procId;
3153     }
3154
3155     @Override
3156     public boolean cancel(boolean mayInterruptIfRunning) {
3157       AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder()
3158           .setProcId(procId).setMayInterruptIfRunning(mayInterruptIfRunning).build();
3159       try {
3160         cancelled = abortProcedureResult(abortProcRequest).getIsProcedureAborted();
3161         if (cancelled) {
3162           done = true;
3163         }
3164       } catch (IOException e) {
3165         // Cancell thrown exception for some reason. At this time, we are not sure whether
3166         // the cancell succeeds or fails. We assume that it is failed, but print out a warning
3167         // for debugging purpose.
3168         LOG.warn(
3169           "Cancelling the procedure with procId=" + procId + " throws exception " + e.getMessage(),
3170           e);
3171         cancelled = false;
3172       }
3173       return cancelled;
3174     }
3175
3176     @Override
3177     public boolean isCancelled() {
3178       return cancelled;
3179     }
3180
3181     protected AbortProcedureResponse abortProcedureResult(
3182         final AbortProcedureRequest request) throws IOException {
3183       return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
3184           admin.getConnection()) {
3185         @Override
3186         public AbortProcedureResponse call(int callTimeout) throws ServiceException {
3187           PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController();
3188           controller.setCallTimeout(callTimeout);
3189           return master.abortProcedure(controller, request);
3190         }
3191       });
3192     }
3193
3194     @Override
3195     public V get() throws InterruptedException, ExecutionException {
3196       // TODO: should we ever spin forever?
3197       throw new UnsupportedOperationException();
3198     }
3199
3200     @Override
3201     public V get(long timeout, TimeUnit unit)
3202         throws InterruptedException, ExecutionException, TimeoutException {
3203       if (!done) {
3204         long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
3205         try {
3206           try {
3207             // if the master support procedures, try to wait the result
3208             if (procId != null) {
3209               result = waitProcedureResult(procId, deadlineTs);
3210             }
3211             // if we don't have a proc result, try the compatibility wait
3212             if (!procResultFound) {
3213               result = waitOperationResult(deadlineTs);
3214             }
3215             result = postOperationResult(result, deadlineTs);
3216             done = true;
3217           } catch (IOException e) {
3218             result = postOperationFailure(e, deadlineTs);
3219             done = true;
3220           }
3221         } catch (IOException e) {
3222           exception = new ExecutionException(e);
3223           done = true;
3224         }
3225       }
3226       if (exception != null) {
3227         throw exception;
3228       }
3229       return result;
3230     }
3231
3232     @Override
3233     public boolean isDone() {
3234       return done;
3235     }
3236
3237     protected HBaseAdmin getAdmin() {
3238       return admin;
3239     }
3240
3241     private V waitProcedureResult(long procId, long deadlineTs)
3242         throws IOException, TimeoutException, InterruptedException {
3243       GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
3244           .setProcId(procId)
3245           .build();
3246
3247       int tries = 0;
3248       IOException serviceEx = null;
3249       while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3250         GetProcedureResultResponse response = null;
3251         try {
3252           // Try to fetch the result
3253           response = getProcedureResult(request);
3254         } catch (IOException e) {
3255           serviceEx = unwrapException(e);
3256
3257           // the master may be down
3258           LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
3259
3260           // Not much to do, if we have a DoNotRetryIOException
3261           if (serviceEx instanceof DoNotRetryIOException) {
3262             // TODO: looks like there is no way to unwrap this exception and get the proper
3263             // UnsupportedOperationException aside from looking at the message.
3264             // anyway, if we fail here we just failover to the compatibility side
3265             // and that is always a valid solution.
3266             LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx);
3267             procResultFound = false;
3268             return null;
3269           }
3270         }
3271
3272         // If the procedure is no longer running, we should have a result
3273         if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) {
3274           procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
3275           return convertResult(response);
3276         }
3277
3278         try {
3279           Thread.sleep(getAdmin().getPauseTime(tries++));
3280         } catch (InterruptedException e) {
3281           throw new InterruptedException(
3282             "Interrupted while waiting for the result of proc " + procId);
3283         }
3284       }
3285       if (serviceEx != null) {
3286         throw serviceEx;
3287       } else {
3288         throw new TimeoutException("The procedure " + procId + " is still running");
3289       }
3290     }
3291
3292     private static IOException unwrapException(IOException e) {
3293       if (e instanceof RemoteException) {
3294         return ((RemoteException)e).unwrapRemoteException();
3295       }
3296       return e;
3297     }
3298
3299     protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
3300         throws IOException {
3301       return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
3302           admin.getConnection()) {
3303         @Override
3304         public GetProcedureResultResponse call(int callTimeout) throws ServiceException {
3305           return master.getProcedureResult(null, request);
3306         }
3307       });
3308     }
3309
3310     /**
3311      * Convert the procedure result response to a specified type.
3312      * @param response the procedure result object to parse
3313      * @return the result data of the procedure.
3314      */
3315     protected V convertResult(final GetProcedureResultResponse response) throws IOException {
3316       if (response.hasException()) {
3317         throw ForeignExceptionUtil.toIOException(response.getException());
3318       }
3319       return null;
3320     }
3321
3322     /**
3323      * Fallback implementation in case the procedure is not supported by the server.
3324      * It should try to wait until the operation is completed.
3325      * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3326      * @return the result data of the operation
3327      */
3328     protected V waitOperationResult(final long deadlineTs)
3329         throws IOException, TimeoutException {
3330       return null;
3331     }
3332
3333     /**
3334      * Called after the operation is completed and the result fetched. this allows to perform extra
3335      * steps after the procedure is completed. it allows to apply transformations to the result that
3336      * will be returned by get().
3337      * @param result the result of the procedure
3338      * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3339      * @return the result of the procedure, which may be the same as the passed one
3340      */
3341     protected V postOperationResult(final V result, final long deadlineTs)
3342         throws IOException, TimeoutException {
3343       return result;
3344     }
3345
3346     /**
3347      * Called after the operation is terminated with a failure.
3348      * this allows to perform extra steps after the procedure is terminated.
3349      * it allows to apply transformations to the result that will be returned by get().
3350      * The default implementation will rethrow the exception
3351      * @param exception the exception got from fetching the result
3352      * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3353      * @return the result of the procedure, which may be the same as the passed one
3354      */
3355     protected V postOperationFailure(final IOException exception, final long deadlineTs)
3356         throws IOException, TimeoutException {
3357       throw exception;
3358     }
3359
3360     protected interface WaitForStateCallable {
3361       boolean checkState(int tries) throws IOException;
3362       void throwInterruptedException() throws InterruptedIOException;
3363       void throwTimeoutException(long elapsed) throws TimeoutException;
3364     }
3365
3366     protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
3367         throws IOException, TimeoutException {
3368       int tries = 0;
3369       IOException serverEx = null;
3370       long startTime = EnvironmentEdgeManager.currentTime();
3371       while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3372         serverEx = null;
3373         try {
3374           if (callable.checkState(tries)) {
3375             return;
3376           }
3377         } catch (IOException e) {
3378           serverEx = e;
3379         }
3380         try {
3381           Thread.sleep(getAdmin().getPauseTime(tries++));
3382         } catch (InterruptedException e) {
3383           callable.throwInterruptedException();
3384         }
3385       }
3386       if (serverEx != null) {
3387         throw unwrapException(serverEx);
3388       } else {
3389         callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
3390       }
3391     }
3392   }
3393
3394   @InterfaceAudience.Private
3395   @InterfaceStability.Evolving
3396   protected static abstract class TableFuture<V> extends ProcedureFuture<V> {
3397     private final TableName tableName;
3398
3399     public TableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
3400       super(admin, procId);
3401       this.tableName = tableName;
3402     }
3403
3404     @Override
3405     public String toString() {
3406       return getDescription();
3407     }
3408
3409     /**
3410      * @return the table name
3411      */
3412     protected TableName getTableName() {
3413       return tableName;
3414     }
3415
3416     /**
3417      * @return the table descriptor
3418      */
3419     protected HTableDescriptor getTableDescriptor() throws IOException {
3420       return getAdmin().getTableDescriptorByTableName(getTableName());
3421     }
3422
3423     /**
3424      * @return the operation type like CREATE, DELETE, DISABLE etc.
3425      */
3426     public abstract String getOperationType();
3427
3428     /**
3429      * @return a description of the operation
3430      */
3431     protected String getDescription() {
3432       return "Operation: " + getOperationType() + ", "
3433           + "Table Name: " + tableName.getNameWithNamespaceInclAsString();
3434
3435     };
3436
3437     protected abstract class TableWaitForStateCallable implements WaitForStateCallable {
3438       @Override
3439       public void throwInterruptedException() throws InterruptedIOException {
3440         throw new InterruptedIOException("Interrupted while waiting for operation: "
3441             + getOperationType() + " on table: " + tableName.getNameWithNamespaceInclAsString());
3442       }
3443
3444       @Override
3445       public void throwTimeoutException(long elapsedTime) throws TimeoutException {
3446         throw new TimeoutException("The operation: " + getOperationType() + " on table: " +
3447             tableName.getNameAsString() + " has not completed after " + elapsedTime + "ms");
3448       }
3449     }
3450
3451     @Override
3452     protected V postOperationResult(final V result, final long deadlineTs)
3453         throws IOException, TimeoutException {
3454       LOG.info(getDescription() + " completed");
3455       return super.postOperationResult(result, deadlineTs);
3456     }
3457
3458     @Override
3459     protected V postOperationFailure(final IOException exception, final long deadlineTs)
3460         throws IOException, TimeoutException {
3461       LOG.info(getDescription() + " failed with " + exception.getMessage());
3462       return super.postOperationFailure(exception, deadlineTs);
3463     }
3464
3465     protected void waitForTableEnabled(final long deadlineTs)
3466         throws IOException, TimeoutException {
3467       waitForState(deadlineTs, new TableWaitForStateCallable() {
3468         @Override
3469         public boolean checkState(int tries) throws IOException {
3470           try {
3471             if (getAdmin().isTableAvailable(tableName)) {
3472               return true;
3473             }
3474           } catch (TableNotFoundException tnfe) {
3475             LOG.debug("Table " + tableName.getNameWithNamespaceInclAsString()
3476                 + " was not enabled, sleeping. tries=" + tries);
3477           }
3478           return false;
3479         }
3480       });
3481     }
3482
3483     protected void waitForTableDisabled(final long deadlineTs)
3484         throws IOException, TimeoutException {
3485       waitForState(deadlineTs, new TableWaitForStateCallable() {
3486         @Override
3487         public boolean checkState(int tries) throws IOException {
3488           return getAdmin().isTableDisabled(tableName);
3489         }
3490       });
3491     }
3492
3493     protected void waitTableNotFound(final long deadlineTs)
3494         throws IOException, TimeoutException {
3495       waitForState(deadlineTs, new TableWaitForStateCallable() {
3496         @Override
3497         public boolean checkState(int tries) throws IOException {
3498           return !getAdmin().tableExists(tableName);
3499         }
3500       });
3501     }
3502
3503     protected void waitForSchemaUpdate(final long deadlineTs)
3504         throws IOException, TimeoutException {
3505       waitForState(deadlineTs, new TableWaitForStateCallable() {
3506         @Override
3507         public boolean checkState(int tries) throws IOException {
3508           return getAdmin().getAlterStatus(tableName).getFirst() == 0;
3509         }
3510       });
3511     }
3512
3513     protected void waitForAllRegionsOnline(final long deadlineTs, final byte[][] splitKeys)
3514         throws IOException, TimeoutException {
3515       final HTableDescriptor desc = getTableDescriptor();
3516       final AtomicInteger actualRegCount = new AtomicInteger(0);
3517       final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3518         @Override
3519         public boolean visit(Result rowResult) throws IOException {
3520           RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
3521           if (list == null) {
3522             LOG.warn("No serialized HRegionInfo in " + rowResult);
3523             return true;
3524           }
3525           HRegionLocation l = list.getRegionLocation();
3526           if (l == null) {
3527             return true;
3528           }
3529           if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
3530             return false;
3531           }
3532           if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
3533           HRegionLocation[] locations = list.getRegionLocations();
3534           for (HRegionLocation location : locations) {
3535             if (location == null) continue;
3536             ServerName serverName = location.getServerName();
3537             // Make sure that regions are assigned to server
3538             if (serverName != null && serverName.getHostAndPort() != null) {
3539               actualRegCount.incrementAndGet();
3540             }
3541           }
3542           return true;
3543         }
3544       };
3545
3546       int tries = 0;
3547       int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
3548       while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3549         actualRegCount.set(0);
3550         MetaTableAccessor.scanMetaForTableRegions(getAdmin().getConnection(), visitor,
3551           desc.getTableName());
3552         if (actualRegCount.get() == numRegs) {
3553           // all the regions are online
3554           return;
3555         }
3556
3557         try {
3558           Thread.sleep(getAdmin().getPauseTime(tries++));
3559         } catch (InterruptedException e) {
3560           throw new InterruptedIOException("Interrupted when opening" + " regions; "
3561               + actualRegCount.get() + " of " + numRegs + " regions processed so far");
3562         }
3563       }
3564       throw new TimeoutException("Only " + actualRegCount.get() + " of " + numRegs
3565           + " regions are online; retries exhausted.");
3566     }
3567   }
3568
3569   @InterfaceAudience.Private
3570   @InterfaceStability.Evolving
3571   protected static abstract class NamespaceFuture extends ProcedureFuture<Void> {
3572     private final String namespaceName;
3573
3574     public NamespaceFuture(final HBaseAdmin admin, final String namespaceName, final Long procId) {
3575       super(admin, procId);
3576       this.namespaceName = namespaceName;
3577     }
3578
3579     /**
3580      * @return the namespace name
3581      */
3582     protected String getNamespaceName() {
3583       return namespaceName;
3584     }
3585
3586     /**
3587      * @return the operation type like CREATE_NAMESPACE, DELETE_NAMESPACE, etc.
3588      */
3589     public abstract String getOperationType();
3590
3591     @Override
3592     public String toString() {
3593       return "Operation: " + getOperationType() + ", Namespace: " + getNamespaceName();
3594     }
3595   }
3596
3597   @Override
3598   public List<SecurityCapability> getSecurityCapabilities() throws IOException {
3599     try {
3600       return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection()) {
3601         @Override
3602         public List<SecurityCapability> call(int callTimeout) throws ServiceException {
3603           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
3604           controller.setCallTimeout(callTimeout);
3605           SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build();
3606           return ProtobufUtil.toSecurityCapabilityList(
3607             master.getSecurityCapabilities(controller, req).getCapabilitiesList());
3608         }
3609       });
3610     } catch (IOException e) {
3611       if (e instanceof RemoteException) {
3612         e = ((RemoteException)e).unwrapRemoteException();
3613       }
3614       throw e;
3615     }
3616   }
3617
3618   @Override
3619   public boolean[] setSplitOrMergeEnabled(final boolean enabled, final boolean synchronous,
3620     final boolean skipLock, final MasterSwitchType... switchTypes) throws IOException {
3621     return executeCallable(new MasterCallable<boolean[]>(getConnection()) {
3622       @Override
3623       public boolean[] call(int callTimeout) throws ServiceException {
3624         MasterProtos.SetSplitOrMergeEnabledResponse response = master.setSplitOrMergeEnabled(null,
3625           RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous,
3626             skipLock, switchTypes));
3627         boolean[] result = new boolean[switchTypes.length];
3628         int i = 0;
3629         for (Boolean prevValue : response.getPrevValueList()) {
3630           result[i++] = prevValue;
3631         }
3632         return result;
3633       }
3634     });
3635   }
3636
3637   @Override
3638   public boolean isSplitOrMergeEnabled(final MasterSwitchType switchType) throws IOException {
3639     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
3640       @Override
3641       public Boolean call(int callTimeout) throws ServiceException {
3642         return master.isSplitOrMergeEnabled(null,
3643           RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType)).getEnabled();
3644       }
3645     });
3646   }
3647
3648   @Override
3649   public void releaseSplitOrMergeLockAndRollback() throws IOException {
3650     executeCallable(new MasterCallable<Void>(getConnection()) {
3651       @Override
3652       public Void call(int callTimeout) throws ServiceException {
3653         master.releaseSplitOrMergeLockAndRollback(null,
3654           RequestConverter.buildReleaseSplitOrMergeLockAndRollbackRequest());
3655         return null;
3656       }
3657     });
3658   }
3659
3660   private HRegionInfo getMobRegionInfo(TableName tableName) {
3661     return new HRegionInfo(tableName, Bytes.toBytes(".mob"),
3662             HConstants.EMPTY_END_ROW, false, 0);
3663   }
3664
3665   private RpcControllerFactory getRpcControllerFactory() {
3666     return rpcControllerFactory;
3667   }
3668 }