View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.TimeoutException;
36  import java.util.concurrent.atomic.AtomicInteger;
37  import java.util.concurrent.atomic.AtomicReference;
38  import java.util.regex.Pattern;
39
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.Abortable;
44  import org.apache.hadoop.hbase.ClusterStatus;
45  import org.apache.hadoop.hbase.DoNotRetryIOException;
46  import org.apache.hadoop.hbase.HBaseConfiguration;
47  import org.apache.hadoop.hbase.HColumnDescriptor;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.MasterNotRunningException;
53  import org.apache.hadoop.hbase.MetaTableAccessor;
54  import org.apache.hadoop.hbase.NamespaceDescriptor;
55  import org.apache.hadoop.hbase.NamespaceNotFoundException;
56  import org.apache.hadoop.hbase.NotServingRegionException;
57  import org.apache.hadoop.hbase.ProcedureInfo;
58  import org.apache.hadoop.hbase.ProcedureUtil;
59  import org.apache.hadoop.hbase.RegionLocations;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.TableExistsException;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotDisabledException;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.UnknownRegionException;
66  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
67  import org.apache.hadoop.hbase.classification.InterfaceAudience;
68  import org.apache.hadoop.hbase.classification.InterfaceStability;
69  import org.apache.hadoop.hbase.client.security.SecurityCapability;
70  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
71  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
72  import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
73  import org.apache.hadoop.hbase.ipc.HBaseRpcController;
74  import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel;
75  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
76  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
77  import org.apache.hadoop.hbase.protobuf.RequestConverter;
78  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
79  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
80  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
81  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
82  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
83  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
84  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
85  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
86  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
87  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
88  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
89  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
90  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
91  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
92  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
93  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
94  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
95  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
96  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureRequest;
97  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureResponse;
98  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListProceduresRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
161 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
162 import org.apache.hadoop.hbase.quotas.QuotaFilter;
163 import org.apache.hadoop.hbase.quotas.QuotaRetriever;
164 import org.apache.hadoop.hbase.quotas.QuotaSettings;
165 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
166 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
167 import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
168 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
169 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
170 import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
171 import org.apache.hadoop.hbase.util.Addressing;
172 import org.apache.hadoop.hbase.util.Bytes;
173 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
174 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
175 import org.apache.hadoop.hbase.util.Pair;
176 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
177 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
178 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
179 import org.apache.hadoop.ipc.RemoteException;
180 import org.apache.hadoop.util.StringUtils;
181 import org.apache.zookeeper.KeeperException;
182
183 import com.google.common.annotations.VisibleForTesting;
184 import com.google.protobuf.ServiceException;
185
186 /**
187  * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
188  * this is an HBase-internal class as defined in
189  * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html
190  * There are no guarantees for backwards source / binary compatibility and methods or class can
191  * change or go away without deprecation.
192  * Use {@link Connection#getAdmin()} to obtain an instance of {@link Admin} instead of constructing
193  * an HBaseAdmin directly.
194  *
195  * <p>Connection should be an <i>unmanaged</i> connection obtained via
196  * {@link ConnectionFactory#createConnection(Configuration)}
197  *
198  * @see ConnectionFactory
199  * @see Connection
200  * @see Admin
201  */
202 @InterfaceAudience.Private
203 @InterfaceStability.Evolving
204 public class HBaseAdmin implements Admin {
205   private static final Log LOG = LogFactory.getLog(HBaseAdmin.class);
206
207   private static final String ZK_IDENTIFIER_PREFIX =  "hbase-admin-on-";
208
209   private ClusterConnection connection;
210
211   private volatile Configuration conf;
212   private final long pause;
213   private final int numRetries;
214   private final int syncWaitTimeout;
215   private boolean aborted;
216   private int operationTimeout;
217   private int rpcTimeout;
218
219   private RpcRetryingCallerFactory rpcCallerFactory;
220   private RpcControllerFactory rpcControllerFactory;
221
222   private NonceGenerator ng;
223
224   @Override
225   public int getOperationTimeout() {
226     return operationTimeout;
227   }
228
229   HBaseAdmin(ClusterConnection connection) throws IOException {
230     this.conf = connection.getConfiguration();
231     this.connection = connection;
232
233     // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time.
234     this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
235         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
236     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
237         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
238     this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
239         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
240     this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
241         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
242     this.syncWaitTimeout = this.conf.getInt(
243       "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
244
245     this.rpcCallerFactory = connection.getRpcRetryingCallerFactory();
246     this.rpcControllerFactory = connection.getRpcControllerFactory();
247
248     this.ng = this.connection.getNonceGenerator();
249   }
250
251   @Override
252   public void abort(String why, Throwable e) {
253     // Currently does nothing but throw the passed message and exception
254     this.aborted = true;
255     throw new RuntimeException(why, e);
256   }
257
258   @Override
259   public boolean isAborted() {
260     return this.aborted;
261   }
262
263   @Override
264   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
265   throws IOException {
266     return get(abortProcedureAsync(procId, mayInterruptIfRunning), this.syncWaitTimeout,
267       TimeUnit.MILLISECONDS);
268   }
269
270   @Override
271   public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning)
272       throws IOException {
273     Boolean abortProcResponse =
274         executeCallable(new MasterCallable<AbortProcedureResponse>(getConnection(),
275             getRpcControllerFactory()) {
276       @Override
277       protected AbortProcedureResponse rpcCall() throws Exception {
278         AbortProcedureRequest abortProcRequest =
279             AbortProcedureRequest.newBuilder().setProcId(procId).build();
280         return master.abortProcedure(getRpcController(), abortProcRequest);
281       }
282     }).getIsProcedureAborted();
283     return new AbortProcedureFuture(this, procId, abortProcResponse);
284   }
285
286   private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
287     private boolean isAbortInProgress;
288
289     public AbortProcedureFuture(
290         final HBaseAdmin admin,
291         final Long procId,
292         final Boolean abortProcResponse) {
293       super(admin, procId);
294       this.isAbortInProgress = abortProcResponse;
295     }
296
297     @Override
298     public Boolean get(long timeout, TimeUnit unit)
299         throws InterruptedException, ExecutionException, TimeoutException {
300       if (!this.isAbortInProgress) {
301         return false;
302       }
303       super.get(timeout, unit);
304       return true;
305     }
306   }
307
308   /** @return Connection used by this object. */
309   @Override
310   public Connection getConnection() {
311     return connection;
312   }
313
314   @Override
315   public boolean tableExists(final TableName tableName) throws IOException {
316     return executeCallable(new RpcRetryingCallable<Boolean>() {
317       @Override
318       protected Boolean rpcCall(int callTimeout) throws Exception {
319         return MetaTableAccessor.tableExists(connection, tableName);
320       }
321     });
322   }
323
324   @Override
325   public HTableDescriptor[] listTables() throws IOException {
326     return listTables((Pattern)null, false);
327   }
328
329   @Override
330   public HTableDescriptor[] listTables(Pattern pattern) throws IOException {
331     return listTables(pattern, false);
332   }
333
334   @Override
335   public HTableDescriptor[] listTables(String regex) throws IOException {
336     return listTables(Pattern.compile(regex), false);
337   }
338
339   @Override
340   public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables)
341       throws IOException {
342     return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
343         getRpcControllerFactory()) {
344       @Override
345       protected HTableDescriptor[] rpcCall() throws Exception {
346         GetTableDescriptorsRequest req =
347             RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
348         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(getRpcController(),
349             req));
350       }
351     });
352   }
353
354   @Override
355   public HTableDescriptor[] listTables(String regex, boolean includeSysTables)
356       throws IOException {
357     return listTables(Pattern.compile(regex), includeSysTables);
358   }
359
360   @Override
361   public TableName[] listTableNames() throws IOException {
362     return listTableNames((Pattern)null, false);
363   }
364
365   @Override
366   public TableName[] listTableNames(Pattern pattern) throws IOException {
367     return listTableNames(pattern, false);
368   }
369
370   @Override
371   public TableName[] listTableNames(String regex) throws IOException {
372     return listTableNames(Pattern.compile(regex), false);
373   }
374
375   @Override
376   public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
377       throws IOException {
378     return executeCallable(new MasterCallable<TableName[]>(getConnection(),
379         getRpcControllerFactory()) {
380       @Override
381       protected TableName[] rpcCall() throws Exception {
382         GetTableNamesRequest req =
383             RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
384         return ProtobufUtil.getTableNameArray(master.getTableNames(getRpcController(), req)
385             .getTableNamesList());
386       }
387     });
388   }
389
390   @Override
391   public TableName[] listTableNames(final String regex, final boolean includeSysTables)
392       throws IOException {
393     return listTableNames(Pattern.compile(regex), includeSysTables);
394   }
395
396   @Override
397   public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
398     return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
399        operationTimeout, rpcTimeout);
400   }
401
402   static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
403       RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
404       int operationTimeout, int rpcTimeout) throws IOException {
405     if (tableName == null) return null;
406     HTableDescriptor htd =
407         executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) {
408       @Override
409       protected HTableDescriptor rpcCall() throws Exception {
410         GetTableDescriptorsRequest req =
411             RequestConverter.buildGetTableDescriptorsRequest(tableName);
412         GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
413         if (!htds.getTableSchemaList().isEmpty()) {
414           return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
415         }
416         return null;
417       }
418     }, rpcCallerFactory, operationTimeout, rpcTimeout);
419     if (htd != null) {
420       return htd;
421     }
422     throw new TableNotFoundException(tableName.getNameAsString());
423   }
424
425   private long getPauseTime(int tries) {
426     int triesCount = tries;
427     if (triesCount >= HConstants.RETRY_BACKOFF.length) {
428       triesCount = HConstants.RETRY_BACKOFF.length - 1;
429     }
430     return this.pause * HConstants.RETRY_BACKOFF[triesCount];
431   }
432
433   @Override
434   public void createTable(HTableDescriptor desc)
435   throws IOException {
436     createTable(desc, null);
437   }
438
439   @Override
440   public void createTable(HTableDescriptor desc, byte [] startKey,
441       byte [] endKey, int numRegions)
442   throws IOException {
443     if(numRegions < 3) {
444       throw new IllegalArgumentException("Must create at least three regions");
445     } else if(Bytes.compareTo(startKey, endKey) >= 0) {
446       throw new IllegalArgumentException("Start key must be smaller than end key");
447     }
448     if (numRegions == 3) {
449       createTable(desc, new byte[][]{startKey, endKey});
450       return;
451     }
452     byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
453     if(splitKeys == null || splitKeys.length != numRegions - 1) {
454       throw new IllegalArgumentException("Unable to split key range into enough regions");
455     }
456     createTable(desc, splitKeys);
457   }
458
459   @Override
460   public void createTable(final HTableDescriptor desc, byte [][] splitKeys)
461       throws IOException {
462     get(createTableAsync(desc, splitKeys), syncWaitTimeout, TimeUnit.MILLISECONDS);
463   }
464
465   @Override
466   public Future<Void> createTableAsync(final HTableDescriptor desc, final byte[][] splitKeys)
467       throws IOException {
468     if (desc.getTableName() == null) {
469       throw new IllegalArgumentException("TableName cannot be null");
470     }
471     if (splitKeys != null && splitKeys.length > 0) {
472       Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
473       // Verify there are no duplicate split keys
474       byte[] lastKey = null;
475       for (byte[] splitKey : splitKeys) {
476         if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
477           throw new IllegalArgumentException(
478               "Empty split key must not be passed in the split keys.");
479         }
480         if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
481           throw new IllegalArgumentException("All split keys must be unique, " +
482             "found duplicate: " + Bytes.toStringBinary(splitKey) +
483             ", " + Bytes.toStringBinary(lastKey));
484         }
485         lastKey = splitKey;
486       }
487     }
488
489     CreateTableResponse response = executeCallable(
490       new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) {
491         @Override
492         protected CreateTableResponse rpcCall() throws Exception {
493           setPriority(desc.getTableName());
494           CreateTableRequest request = RequestConverter.buildCreateTableRequest(
495             desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
496           return master.createTable(getRpcController(), request);
497         }
498       });
499     return new CreateTableFuture(this, desc, splitKeys, response);
500   }
501
502   private static class CreateTableFuture extends TableFuture<Void> {
503     private final HTableDescriptor desc;
504     private final byte[][] splitKeys;
505
506     public CreateTableFuture(final HBaseAdmin admin, final HTableDescriptor desc,
507         final byte[][] splitKeys, final CreateTableResponse response) {
508       super(admin, desc.getTableName(),
509               (response != null && response.hasProcId()) ? response.getProcId() : null);
510       this.splitKeys = splitKeys;
511       this.desc = desc;
512     }
513
514     @Override
515     protected HTableDescriptor getTableDescriptor() {
516       return desc;
517     }
518
519     @Override
520     public String getOperationType() {
521       return "CREATE";
522     }
523
524     @Override
525     protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
526       waitForTableEnabled(deadlineTs);
527       waitForAllRegionsOnline(deadlineTs, splitKeys);
528       return null;
529     }
530   }
531
532   @Override
533   public void deleteTable(final TableName tableName) throws IOException {
534     get(deleteTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
535   }
536
537   @Override
538   public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
539     DeleteTableResponse response = executeCallable(
540       new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) {
541         @Override
542         protected DeleteTableResponse rpcCall() throws Exception {
543           setPriority(tableName);
544           DeleteTableRequest req =
545               RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
546           return master.deleteTable(getRpcController(), req);
547         }
548       });
549     return new DeleteTableFuture(this, tableName, response);
550   }
551
552   private static class DeleteTableFuture extends TableFuture<Void> {
553     public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
554         final DeleteTableResponse response) {
555       super(admin, tableName,
556               (response != null && response.hasProcId()) ? response.getProcId() : null);
557     }
558
559     @Override
560     public String getOperationType() {
561       return "DELETE";
562     }
563
564     @Override
565     protected Void waitOperationResult(final long deadlineTs)
566         throws IOException, TimeoutException {
567       waitTableNotFound(deadlineTs);
568       return null;
569     }
570
571     @Override
572     protected Void postOperationResult(final Void result, final long deadlineTs)
573         throws IOException, TimeoutException {
574       // Delete cached information to prevent clients from using old locations
575       ((ClusterConnection) getAdmin().getConnection()).clearRegionCache(getTableName());
576       return super.postOperationResult(result, deadlineTs);
577     }
578   }
579
580   @Override
581   public HTableDescriptor[] deleteTables(String regex) throws IOException {
582     return deleteTables(Pattern.compile(regex));
583   }
584
585   /**
586    * Delete tables matching the passed in pattern and wait on completion.
587    *
588    * Warning: Use this method carefully, there is no prompting and the effect is
589    * immediate. Consider using {@link #listTables(java.util.regex.Pattern) } and
590    * {@link #deleteTable(TableName)}
591    *
592    * @param pattern The pattern to match table names against
593    * @return Table descriptors for tables that couldn't be deleted
594    * @throws IOException
595    */
596   @Override
597   public HTableDescriptor[] deleteTables(Pattern pattern) throws IOException {
598     List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
599     for (HTableDescriptor table : listTables(pattern)) {
600       try {
601         deleteTable(table.getTableName());
602       } catch (IOException ex) {
603         LOG.info("Failed to delete table " + table.getTableName(), ex);
604         failed.add(table);
605       }
606     }
607     return failed.toArray(new HTableDescriptor[failed.size()]);
608   }
609
610   @Override
611   public void truncateTable(final TableName tableName, final boolean preserveSplits)
612       throws IOException {
613     get(truncateTableAsync(tableName, preserveSplits), syncWaitTimeout, TimeUnit.MILLISECONDS);
614   }
615
616   @Override
617   public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
618       throws IOException {
619     TruncateTableResponse response =
620         executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(),
621             getRpcControllerFactory()) {
622           @Override
623           protected TruncateTableResponse rpcCall() throws Exception {
624             setPriority(tableName);
625             LOG.info("Started truncating " + tableName);
626             TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
627               tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
628             return master.truncateTable(getRpcController(), req);
629           }
630         });
631     return new TruncateTableFuture(this, tableName, preserveSplits, response);
632   }
633
634   private static class TruncateTableFuture extends TableFuture<Void> {
635     private final boolean preserveSplits;
636
637     public TruncateTableFuture(final HBaseAdmin admin, final TableName tableName,
638         final boolean preserveSplits, final TruncateTableResponse response) {
639       super(admin, tableName,
640              (response != null && response.hasProcId()) ? response.getProcId() : null);
641       this.preserveSplits = preserveSplits;
642     }
643
644     @Override
645     public String getOperationType() {
646       return "TRUNCATE";
647     }
648
649     @Override
650     protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
651       waitForTableEnabled(deadlineTs);
652       // once the table is enabled, we know the operation is done. so we can fetch the splitKeys
653       byte[][] splitKeys = preserveSplits ? getAdmin().getTableSplits(getTableName()) : null;
654       waitForAllRegionsOnline(deadlineTs, splitKeys);
655       return null;
656     }
657   }
658
659   private byte[][] getTableSplits(final TableName tableName) throws IOException {
660     byte[][] splits = null;
661     try (RegionLocator locator = getConnection().getRegionLocator(tableName)) {
662       byte[][] startKeys = locator.getStartKeys();
663       if (startKeys.length == 1) {
664         return splits;
665       }
666       splits = new byte[startKeys.length - 1][];
667       for (int i = 1; i < startKeys.length; i++) {
668         splits[i - 1] = startKeys[i];
669       }
670     }
671     return splits;
672   }
673
674   @Override
675   public void enableTable(final TableName tableName)
676   throws IOException {
677     get(enableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
678   }
679
680   @Override
681   public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
682     TableName.isLegalFullyQualifiedTableName(tableName.getName());
683     EnableTableResponse response = executeCallable(
684       new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) {
685         @Override
686         protected EnableTableResponse rpcCall() throws Exception {
687           setPriority(tableName);
688           LOG.info("Started enable of " + tableName);
689           EnableTableRequest req =
690               RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
691           return master.enableTable(getRpcController(),req);
692         }
693       });
694     return new EnableTableFuture(this, tableName, response);
695   }
696
697   private static class EnableTableFuture extends TableFuture<Void> {
698     public EnableTableFuture(final HBaseAdmin admin, final TableName tableName,
699         final EnableTableResponse response) {
700       super(admin, tableName,
701               (response != null && response.hasProcId()) ? response.getProcId() : null);
702     }
703
704     @Override
705     public String getOperationType() {
706       return "ENABLE";
707     }
708
709     @Override
710     protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
711       waitForTableEnabled(deadlineTs);
712       return null;
713     }
714   }
715
716   @Override
717   public HTableDescriptor[] enableTables(String regex) throws IOException {
718     return enableTables(Pattern.compile(regex));
719   }
720
721   @Override
722   public HTableDescriptor[] enableTables(Pattern pattern) throws IOException {
723     List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
724     for (HTableDescriptor table : listTables(pattern)) {
725       if (isTableDisabled(table.getTableName())) {
726         try {
727           enableTable(table.getTableName());
728         } catch (IOException ex) {
729           LOG.info("Failed to enable table " + table.getTableName(), ex);
730           failed.add(table);
731         }
732       }
733     }
734     return failed.toArray(new HTableDescriptor[failed.size()]);
735   }
736
737   @Override
738   public void disableTable(final TableName tableName)
739   throws IOException {
740     get(disableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
741   }
742
743   @Override
744   public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
745     TableName.isLegalFullyQualifiedTableName(tableName.getName());
746     DisableTableResponse response = executeCallable(
747       new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) {
748         @Override
749         protected DisableTableResponse rpcCall() throws Exception {
750           setPriority(tableName);
751           LOG.info("Started disable of " + tableName);
752           DisableTableRequest req =
753               RequestConverter.buildDisableTableRequest(
754                 tableName, ng.getNonceGroup(), ng.newNonce());
755           return master.disableTable(getRpcController(), req);
756         }
757       });
758     return new DisableTableFuture(this, tableName, response);
759   }
760
761   private static class DisableTableFuture extends TableFuture<Void> {
762     public DisableTableFuture(final HBaseAdmin admin, final TableName tableName,
763         final DisableTableResponse response) {
764       super(admin, tableName,
765               (response != null && response.hasProcId()) ? response.getProcId() : null);
766     }
767
768     @Override
769     public String getOperationType() {
770       return "DISABLE";
771     }
772
773     @Override
774     protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException {
775       waitForTableDisabled(deadlineTs);
776       return null;
777     }
778   }
779
780   @Override
781   public HTableDescriptor[] disableTables(String regex) throws IOException {
782     return disableTables(Pattern.compile(regex));
783   }
784
785   @Override
786   public HTableDescriptor[] disableTables(Pattern pattern) throws IOException {
787     List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
788     for (HTableDescriptor table : listTables(pattern)) {
789       if (isTableEnabled(table.getTableName())) {
790         try {
791           disableTable(table.getTableName());
792         } catch (IOException ex) {
793           LOG.info("Failed to disable table " + table.getTableName(), ex);
794           failed.add(table);
795         }
796       }
797     }
798     return failed.toArray(new HTableDescriptor[failed.size()]);
799   }
800
801   @Override
802   public boolean isTableEnabled(final TableName tableName) throws IOException {
803     checkTableExists(tableName);
804     return executeCallable(new RpcRetryingCallable<Boolean>() {
805       @Override
806       protected Boolean rpcCall(int callTimeout) throws Exception {
807         TableState tableState = MetaTableAccessor.getTableState(getConnection(), tableName);
808         if (tableState == null) {
809           throw new TableNotFoundException(tableName);
810         }
811         return tableState.inStates(TableState.State.ENABLED);
812       }
813     });
814   }
815
816   @Override
817   public boolean isTableDisabled(TableName tableName) throws IOException {
818     checkTableExists(tableName);
819     return connection.isTableDisabled(tableName);
820   }
821
822   @Override
823   public boolean isTableAvailable(TableName tableName) throws IOException {
824     return connection.isTableAvailable(tableName, null);
825   }
826
827   @Override
828   public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
829     return connection.isTableAvailable(tableName, splitKeys);
830   }
831
832   @Override
833   public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException {
834     return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(),
835         getRpcControllerFactory()) {
836       @Override
837       protected Pair<Integer, Integer> rpcCall() throws Exception {
838         setPriority(tableName);
839         GetSchemaAlterStatusRequest req = RequestConverter
840             .buildGetSchemaAlterStatusRequest(tableName);
841         GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(getRpcController(), req);
842         Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
843             ret.getTotalRegions());
844         return pair;
845       }
846     });
847   }
848
849   @Override
850   public Pair<Integer, Integer> getAlterStatus(final byte[] tableName) throws IOException {
851     return getAlterStatus(TableName.valueOf(tableName));
852   }
853
854   /**
855    * {@inheritDoc}
856    * @deprecated Since 2.0. Will be removed in 3.0. Use
857    *     {@link #addColumnFamily(TableName, HColumnDescriptor)} instead.
858    */
859   @Override
860   @Deprecated
861   public void addColumn(final TableName tableName, final HColumnDescriptor columnFamily)
862   throws IOException {
863     addColumnFamily(tableName, columnFamily);
864   }
865
866   @Override
867   public void addColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily)
868       throws IOException {
869     get(addColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
870   }
871
872   @Override
873   public Future<Void> addColumnFamilyAsync(final TableName tableName,
874       final HColumnDescriptor columnFamily) throws IOException {
875     AddColumnResponse response =
876         executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
877             getRpcControllerFactory()) {
878           @Override
879           protected AddColumnResponse rpcCall() throws Exception {
880             setPriority(tableName);
881             AddColumnRequest req =
882                 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
883                   ng.newNonce());
884             return master.addColumn(getRpcController(), req);
885           }
886         });
887     return new AddColumnFamilyFuture(this, tableName, response);
888   }
889
890   private static class AddColumnFamilyFuture extends ModifyTableFuture {
891     public AddColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
892         final AddColumnResponse response) {
893       super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
894           : null);
895     }
896
897     @Override
898     public String getOperationType() {
899       return "ADD_COLUMN_FAMILY";
900     }
901   }
902
903   /**
904    * {@inheritDoc}
905    * @deprecated Since 2.0. Will be removed in 3.0. Use
906    *     {@link #deleteColumnFamily(TableName, byte[])} instead.
907    */
908   @Override
909   @Deprecated
910   public void deleteColumn(final TableName tableName, final byte[] columnFamily)
911   throws IOException {
912     deleteColumnFamily(tableName, columnFamily);
913   }
914
915   @Override
916   public void deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
917       throws IOException {
918     get(deleteColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
919   }
920
921   @Override
922   public Future<Void> deleteColumnFamilyAsync(final TableName tableName, final byte[] columnFamily)
923       throws IOException {
924     DeleteColumnResponse response =
925         executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(),
926             getRpcControllerFactory()) {
927           @Override
928           protected DeleteColumnResponse rpcCall() throws Exception {
929             setPriority(tableName);
930             DeleteColumnRequest req =
931                 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
932                   ng.getNonceGroup(), ng.newNonce());
933             return master.deleteColumn(getRpcController(), req);
934           }
935         });
936     return new DeleteColumnFamilyFuture(this, tableName, response);
937   }
938
939   private static class DeleteColumnFamilyFuture extends ModifyTableFuture {
940     public DeleteColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
941         final DeleteColumnResponse response) {
942       super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
943           : null);
944     }
945
946     @Override
947     public String getOperationType() {
948       return "DELETE_COLUMN_FAMILY";
949     }
950   }
951
952   /**
953    * {@inheritDoc}
954    * @deprecated As of 2.0. Will be removed in 3.0. Use
955    *     {@link #modifyColumnFamily(TableName, HColumnDescriptor)} instead.
956    */
957   @Override
958   @Deprecated
959   public void modifyColumn(final TableName tableName, final HColumnDescriptor columnFamily)
960   throws IOException {
961     modifyColumnFamily(tableName, columnFamily);
962   }
963
964   @Override
965   public void modifyColumnFamily(final TableName tableName,
966       final HColumnDescriptor columnFamily) throws IOException {
967     get(modifyColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
968   }
969
970   @Override
971   public Future<Void> modifyColumnFamilyAsync(final TableName tableName,
972       final HColumnDescriptor columnFamily) throws IOException {
973     ModifyColumnResponse response =
974         executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(),
975             getRpcControllerFactory()) {
976           @Override
977           protected ModifyColumnResponse rpcCall() throws Exception {
978             setPriority(tableName);
979             ModifyColumnRequest req =
980                 RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
981                   ng.getNonceGroup(), ng.newNonce());
982             return master.modifyColumn(getRpcController(), req);
983           }
984         });
985     return new ModifyColumnFamilyFuture(this, tableName, response);
986   }
987
988   private static class ModifyColumnFamilyFuture extends ModifyTableFuture {
989     public ModifyColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
990         final ModifyColumnResponse response) {
991       super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
992           : null);
993     }
994
995     @Override
996     public String getOperationType() {
997       return "MODIFY_COLUMN_FAMILY";
998     }
999   }
1000
1001   @Override
1002   public void closeRegion(final String regionname, final String serverName) throws IOException {
1003     closeRegion(Bytes.toBytes(regionname), serverName);
1004   }
1005
1006   @Override
1007   public void closeRegion(final byte [] regionname, final String serverName) throws IOException {
1008     if (serverName != null) {
1009       Pair<HRegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionname);
1010       if (pair == null || pair.getFirst() == null) {
1011         throw new UnknownRegionException(Bytes.toStringBinary(regionname));
1012       } else {
1013         closeRegion(ServerName.valueOf(serverName), pair.getFirst());
1014       }
1015     } else {
1016       Pair<HRegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionname);
1017       if (pair == null) {
1018         throw new UnknownRegionException(Bytes.toStringBinary(regionname));
1019       } else if (pair.getSecond() == null) {
1020         throw new NoServerForRegionException(Bytes.toStringBinary(regionname));
1021       } else {
1022         closeRegion(pair.getSecond(), pair.getFirst());
1023       }
1024     }
1025   }
1026
1027   @Override
1028   public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
1029       final String serverName)
1030   throws IOException {
1031     if (null == serverName || ("").equals(serverName.trim())) {
1032       throw new IllegalArgumentException("The servername cannot be null or empty.");
1033     }
1034     ServerName sn = ServerName.valueOf(serverName);
1035     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1036     // Close the region without updating zk state.
1037     CloseRegionRequest request =
1038       RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
1039     // TODO: There is no timeout on this controller. Set one!
1040     HBaseRpcController controller = this.rpcControllerFactory.newController();
1041     try {
1042       CloseRegionResponse response = admin.closeRegion(controller, request);
1043       boolean closed = response.getClosed();
1044       if (false == closed) {
1045         LOG.error("Not able to close the region " + encodedRegionName + ".");
1046       }
1047       return closed;
1048     } catch (Exception e) {
1049       throw ProtobufUtil.handleRemoteException(e);
1050     }
1051   }
1052
1053   @Override
1054   public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException {
1055     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1056     // TODO: There is no timeout on this controller. Set one!
1057     HBaseRpcController controller = rpcControllerFactory.newController();
1058
1059     // Close the region without updating zk state.
1060     ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName());
1061   }
1062
1063   @Override
1064   public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
1065     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1066     // TODO: There is no timeout on this controller. Set one!
1067     HBaseRpcController controller = rpcControllerFactory.newController();
1068     return ProtobufUtil.getOnlineRegions(controller, admin);
1069   }
1070
1071   @Override
1072   public void flush(final TableName tableName) throws IOException {
1073     checkTableExists(tableName);
1074     if (isTableDisabled(tableName)) {
1075       LOG.info("Table is disabled: " + tableName.getNameAsString());
1076       return;
1077     }
1078     execProcedure("flush-table-proc", tableName.getNameAsString(),
1079       new HashMap<String, String>());
1080   }
1081
1082   @Override
1083   public void flushRegion(final byte[] regionName) throws IOException {
1084     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
1085     if (regionServerPair == null) {
1086       throw new IllegalArgumentException("Unknown regionname: " + Bytes.toStringBinary(regionName));
1087     }
1088     if (regionServerPair.getSecond() == null) {
1089       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1090     }
1091     final HRegionInfo hRegionInfo = regionServerPair.getFirst();
1092     ServerName serverName = regionServerPair.getSecond();
1093     final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
1094     Callable<Void> callable = new Callable<Void>() {
1095       @Override
1096       public Void call() throws Exception {
1097         // TODO: There is no timeout on this controller. Set one!
1098         HBaseRpcController controller = rpcControllerFactory.newController();
1099         FlushRegionRequest request =
1100             RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
1101         admin.flushRegion(controller, request);
1102         return null;
1103       }
1104     };
1105     ProtobufUtil.call(callable);
1106   }
1107
1108   /**
1109    * {@inheritDoc}
1110    */
1111   @Override
1112   public void compact(final TableName tableName)
1113     throws IOException {
1114     compact(tableName, null, false, CompactType.NORMAL);
1115   }
1116
1117   @Override
1118   public void compactRegion(final byte[] regionName)
1119     throws IOException {
1120     compactRegion(regionName, null, false);
1121   }
1122
1123   /**
1124    * {@inheritDoc}
1125    */
1126   @Override
1127   public void compact(final TableName tableName, final byte[] columnFamily)
1128     throws IOException {
1129     compact(tableName, columnFamily, false, CompactType.NORMAL);
1130   }
1131
1132   /**
1133    * {@inheritDoc}
1134    */
1135   @Override
1136   public void compactRegion(final byte[] regionName, final byte[] columnFamily)
1137     throws IOException {
1138     compactRegion(regionName, columnFamily, false);
1139   }
1140
1141   /**
1142    * {@inheritDoc}
1143    */
1144   @Override
1145   public void compactRegionServer(final ServerName sn, boolean major)
1146   throws IOException, InterruptedException {
1147     for (HRegionInfo region : getOnlineRegions(sn)) {
1148       compact(sn, region, major, null);
1149     }
1150   }
1151
1152   @Override
1153   public void majorCompact(final TableName tableName)
1154   throws IOException {
1155     compact(tableName, null, true, CompactType.NORMAL);
1156   }
1157
1158   @Override
1159   public void majorCompactRegion(final byte[] regionName)
1160   throws IOException {
1161     compactRegion(regionName, null, true);
1162   }
1163
1164   /**
1165    * {@inheritDoc}
1166    */
1167   @Override
1168   public void majorCompact(final TableName tableName, final byte[] columnFamily)
1169   throws IOException {
1170     compact(tableName, columnFamily, true, CompactType.NORMAL);
1171   }
1172
1173   @Override
1174   public void majorCompactRegion(final byte[] regionName, final byte[] columnFamily)
1175   throws IOException {
1176     compactRegion(regionName, columnFamily, true);
1177   }
1178
1179   /**
1180    * Compact a table.
1181    * Asynchronous operation.
1182    *
1183    * @param tableName table or region to compact
1184    * @param columnFamily column family within a table or region
1185    * @param major True if we are to do a major compaction.
1186    * @throws IOException if a remote or network exception occurs
1187    * @throws InterruptedException
1188    */
1189   private void compact(final TableName tableName, final byte[] columnFamily,final boolean major,
1190                        CompactType compactType) throws IOException {
1191     switch (compactType) {
1192       case MOB:
1193         ServerName master = getMasterAddress();
1194         compact(master, getMobRegionInfo(tableName), major, columnFamily);
1195         break;
1196       case NORMAL:
1197       default:
1198         ZooKeeperWatcher zookeeper = null;
1199         try {
1200           checkTableExists(tableName);
1201           zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
1202                   new ThrowableAbortable());
1203           List<Pair<HRegionInfo, ServerName>> pairs;
1204           if (TableName.META_TABLE_NAME.equals(tableName)) {
1205             pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
1206           } else {
1207             pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
1208           }
1209           for (Pair<HRegionInfo, ServerName> pair: pairs) {
1210             if (pair.getFirst().isOffline()) continue;
1211             if (pair.getSecond() == null) continue;
1212             try {
1213               compact(pair.getSecond(), pair.getFirst(), major, columnFamily);
1214             } catch (NotServingRegionException e) {
1215               if (LOG.isDebugEnabled()) {
1216                 LOG.debug("Trying to" + (major ? " major" : "") + " compact " +
1217                         pair.getFirst() + ": " +
1218                         StringUtils.stringifyException(e));
1219               }
1220             }
1221           }
1222         } finally {
1223           if (zookeeper != null) {
1224             zookeeper.close();
1225           }
1226         }
1227         break;
1228     }
1229   }
1230
1231   /**
1232    * Compact an individual region.
1233    * Asynchronous operation.
1234    *
1235    * @param regionName region to compact
1236    * @param columnFamily column family within a table or region
1237    * @param major True if we are to do a major compaction.
1238    * @throws IOException if a remote or network exception occurs
1239    * @throws InterruptedException
1240    */
1241   private void compactRegion(final byte[] regionName, final byte[] columnFamily,final boolean major)
1242   throws IOException {
1243     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
1244     if (regionServerPair == null) {
1245       throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1246     }
1247     if (regionServerPair.getSecond() == null) {
1248       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1249     }
1250     compact(regionServerPair.getSecond(), regionServerPair.getFirst(), major, columnFamily);
1251   }
1252
1253   private void compact(final ServerName sn, final HRegionInfo hri,
1254       final boolean major, final byte [] family)
1255   throws IOException {
1256     final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1257     Callable<Void> callable = new Callable<Void>() {
1258       @Override
1259       public Void call() throws Exception {
1260         // TODO: There is no timeout on this controller. Set one!
1261         HBaseRpcController controller = rpcControllerFactory.newController();
1262         CompactRegionRequest request =
1263             RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
1264         admin.compactRegion(controller, request);
1265         return null;
1266       }
1267     };
1268     ProtobufUtil.call(callable);
1269   }
1270
1271   @Override
1272   public void move(final byte [] encodedRegionName, final byte [] destServerName)
1273   throws IOException {
1274     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1275       @Override
1276       protected Void rpcCall() throws Exception {
1277         setPriority(encodedRegionName);
1278         MoveRegionRequest request =
1279             RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
1280         master.moveRegion(getRpcController(), request);
1281         return null;
1282       }
1283     });
1284   }
1285
1286   @Override
1287   public void assign(final byte [] regionName) throws MasterNotRunningException,
1288       ZooKeeperConnectionException, IOException {
1289     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1290       @Override
1291       protected Void rpcCall() throws Exception {
1292         setPriority(regionName);
1293         AssignRegionRequest request =
1294             RequestConverter.buildAssignRegionRequest(getRegionName(regionName));
1295         master.assignRegion(getRpcController(), request);
1296         return null;
1297       }
1298     });
1299   }
1300
1301   @Override
1302   public void unassign(final byte [] regionName, final boolean force)
1303   throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
1304     final byte[] toBeUnassigned = getRegionName(regionName);
1305     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1306       @Override
1307       protected Void rpcCall() throws Exception {
1308         setPriority(regionName);
1309         UnassignRegionRequest request =
1310             RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
1311         master.unassignRegion(getRpcController(), request);
1312         return null;
1313       }
1314     });
1315   }
1316
1317   @Override
1318   public void offline(final byte [] regionName)
1319   throws IOException {
1320     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1321       @Override
1322       protected Void rpcCall() throws Exception {
1323         setPriority(regionName);
1324         master.offlineRegion(getRpcController(),
1325             RequestConverter.buildOfflineRegionRequest(regionName));
1326         return null;
1327       }
1328     });
1329   }
1330
1331   @Override
1332   public boolean setBalancerRunning(final boolean on, final boolean synchronous)
1333   throws IOException {
1334     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1335       @Override
1336       protected Boolean rpcCall() throws Exception {
1337         SetBalancerRunningRequest req =
1338             RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
1339         return master.setBalancerRunning(getRpcController(), req).getPrevBalanceValue();
1340       }
1341     });
1342   }
1343
1344   @Override
1345   public boolean balancer() throws IOException {
1346     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1347       @Override
1348       protected Boolean rpcCall() throws Exception {
1349         return master.balance(getRpcController(),
1350             RequestConverter.buildBalanceRequest(false)).getBalancerRan();
1351       }
1352     });
1353   }
1354
1355   @Override
1356   public boolean balancer(final boolean force) throws IOException {
1357     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1358       @Override
1359       protected Boolean rpcCall() throws Exception {
1360         return master.balance(getRpcController(),
1361             RequestConverter.buildBalanceRequest(force)).getBalancerRan();
1362       }
1363     });
1364   }
1365
1366   @Override
1367   public boolean isBalancerEnabled() throws IOException {
1368     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1369       @Override
1370       protected Boolean rpcCall() throws Exception {
1371         return master.isBalancerEnabled(getRpcController(),
1372           RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
1373       }
1374     });
1375   }
1376
1377   @Override
1378   public boolean normalize() throws IOException {
1379     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1380       @Override
1381       protected Boolean rpcCall() throws Exception {
1382         return master.normalize(getRpcController(),
1383             RequestConverter.buildNormalizeRequest()).getNormalizerRan();
1384       }
1385     });
1386   }
1387
1388   @Override
1389   public boolean isNormalizerEnabled() throws IOException {
1390     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1391       @Override
1392       protected Boolean rpcCall() throws Exception {
1393         return master.isNormalizerEnabled(getRpcController(),
1394           RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
1395       }
1396     });
1397   }
1398
1399   @Override
1400   public boolean setNormalizerRunning(final boolean on) throws IOException {
1401     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1402       @Override
1403       protected Boolean rpcCall() throws Exception {
1404         SetNormalizerRunningRequest req =
1405           RequestConverter.buildSetNormalizerRunningRequest(on);
1406         return master.setNormalizerRunning(getRpcController(), req).getPrevNormalizerValue();
1407       }
1408     });
1409   }
1410
1411   @Override
1412   public boolean enableCatalogJanitor(final boolean enable) throws IOException {
1413     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1414       @Override
1415       protected Boolean rpcCall() throws Exception {
1416         return master.enableCatalogJanitor(getRpcController(),
1417           RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
1418       }
1419     });
1420   }
1421
1422   @Override
1423   public int runCatalogScan() throws IOException {
1424     return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) {
1425       @Override
1426       protected Integer rpcCall() throws Exception {
1427         return master.runCatalogScan(getRpcController(),
1428           RequestConverter.buildCatalogScanRequest()).getScanResult();
1429       }
1430     });
1431   }
1432
1433   @Override
1434   public boolean isCatalogJanitorEnabled() throws IOException {
1435     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1436       @Override
1437       protected Boolean rpcCall() throws Exception {
1438         return master.isCatalogJanitorEnabled(getRpcController(),
1439           RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
1440       }
1441     });
1442   }
1443
1444   private boolean isEncodedRegionName(byte[] regionName) throws IOException {
1445     try {
1446       HRegionInfo.parseRegionName(regionName);
1447       return false;
1448     } catch (IOException e) {
1449       if (StringUtils.stringifyException(e)
1450         .contains(HRegionInfo.INVALID_REGION_NAME_FORMAT_MESSAGE)) {
1451         return true;
1452       }
1453       throw e;
1454     }
1455   }
1456
1457   /**
1458    * Merge two regions. Synchronous operation.
1459    * Note: It is not feasible to predict the length of merge.
1460    *   Therefore, this is for internal testing only.
1461    * @param nameOfRegionA encoded or full name of region a
1462    * @param nameOfRegionB encoded or full name of region b
1463    * @param forcible true if do a compulsory merge, otherwise we will only merge
1464    *          two adjacent regions
1465    * @throws IOException
1466    */
1467   @VisibleForTesting
1468   public void mergeRegionsSync(
1469       final byte[] nameOfRegionA,
1470       final byte[] nameOfRegionB,
1471       final boolean forcible) throws IOException {
1472     get(
1473       mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible),
1474       syncWaitTimeout,
1475       TimeUnit.MILLISECONDS);
1476   }
1477
1478   /**
1479    * Merge two regions. Asynchronous operation.
1480    * @param nameOfRegionA encoded or full name of region a
1481    * @param nameOfRegionB encoded or full name of region b
1482    * @param forcible true if do a compulsory merge, otherwise we will only merge
1483    *          two adjacent regions
1484    * @throws IOException
1485    * @deprecated Since 2.0. Will be removed in 3.0. Use
1486    *     {@link #mergeRegionsAsync(byte[], byte[], boolean)} instead.
1487    */
1488   @Deprecated
1489   @Override
1490   public void mergeRegions(final byte[] nameOfRegionA,
1491       final byte[] nameOfRegionB, final boolean forcible)
1492       throws IOException {
1493     mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible);
1494   }
1495
1496   /**
1497    * Merge two regions. Asynchronous operation.
1498    * @param nameOfRegionA encoded or full name of region a
1499    * @param nameOfRegionB encoded or full name of region b
1500    * @param forcible true if do a compulsory merge, otherwise we will only merge
1501    *          two adjacent regions
1502    * @throws IOException
1503    */
1504   @Override
1505   public Future<Void> mergeRegionsAsync(
1506       final byte[] nameOfRegionA,
1507       final byte[] nameOfRegionB,
1508       final boolean forcible) throws IOException {
1509
1510     final byte[] encodedNameOfRegionA = isEncodedRegionName(nameOfRegionA) ?
1511       nameOfRegionA : HRegionInfo.encodeRegionName(nameOfRegionA).getBytes();
1512     final byte[] encodedNameOfRegionB = isEncodedRegionName(nameOfRegionB) ?
1513       nameOfRegionB : HRegionInfo.encodeRegionName(nameOfRegionB).getBytes();
1514
1515     TableName tableName;
1516     Pair<HRegionInfo, ServerName> pair = getRegion(nameOfRegionA);
1517
1518     if (pair != null) {
1519       if (pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1520         throw new IllegalArgumentException ("Can't invoke merge on non-default regions directly");
1521       }
1522       tableName = pair.getFirst().getTable();
1523     } else {
1524       throw new UnknownRegionException (
1525         "Can't invoke merge on unknown region " + Bytes.toStringBinary(encodedNameOfRegionA));
1526     }
1527
1528     pair = getRegion(nameOfRegionB);
1529     if (pair != null) {
1530       if (pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1531         throw new IllegalArgumentException ("Can't invoke merge on non-default regions directly");
1532       }
1533
1534       if (!tableName.equals(pair.getFirst().getTable())) {
1535         throw new IllegalArgumentException ("Cannot merge regions from two different tables " +
1536           tableName + " and " + pair.getFirst().getTable());
1537       }
1538     } else {
1539       throw new UnknownRegionException (
1540         "Can't invoke merge on unknown region " + Bytes.toStringBinary(encodedNameOfRegionB));
1541     }
1542
1543     DispatchMergingRegionsResponse response =
1544         executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection(),
1545             getRpcControllerFactory()) {
1546       @Override
1547       protected DispatchMergingRegionsResponse rpcCall() throws Exception {
1548         DispatchMergingRegionsRequest request = RequestConverter
1549             .buildDispatchMergingRegionsRequest(
1550                 encodedNameOfRegionA,
1551                 encodedNameOfRegionB,
1552                 forcible,
1553                 ng.getNonceGroup(),
1554                 ng.newNonce());
1555         return master.dispatchMergingRegions(getRpcController(), request);
1556       }
1557     });
1558     return new DispatchMergingRegionsFuture(this, tableName, response);
1559   }
1560
1561   private static class DispatchMergingRegionsFuture extends TableFuture<Void> {
1562     public DispatchMergingRegionsFuture(
1563         final HBaseAdmin admin,
1564         final TableName tableName,
1565         final DispatchMergingRegionsResponse response) {
1566       super(admin, tableName,
1567           (response != null && response.hasProcId()) ? response.getProcId() : null);
1568     }
1569
1570     public DispatchMergingRegionsFuture(
1571         final HBaseAdmin admin,
1572         final TableName tableName,
1573         final Long procId) {
1574       super(admin, tableName, procId);
1575     }
1576
1577     @Override
1578     public String getOperationType() {
1579       return "MERGE_REGIONS";
1580     }
1581   }
1582
1583   @Override
1584   public void split(final TableName tableName) throws IOException {
1585     split(tableName, null);
1586   }
1587
1588   @Override
1589   public void splitRegion(final byte[] regionName) throws IOException {
1590     splitRegion(regionName, null);
1591   }
1592
1593   /**
1594    * {@inheritDoc}
1595    */
1596   @Override
1597   public void split(final TableName tableName, final byte [] splitPoint) throws IOException {
1598     ZooKeeperWatcher zookeeper = null;
1599     try {
1600       checkTableExists(tableName);
1601       zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
1602         new ThrowableAbortable());
1603       List<Pair<HRegionInfo, ServerName>> pairs;
1604       if (TableName.META_TABLE_NAME.equals(tableName)) {
1605         pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
1606       } else {
1607         pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
1608       }
1609       for (Pair<HRegionInfo, ServerName> pair: pairs) {
1610         // May not be a server for a particular row
1611         if (pair.getSecond() == null) continue;
1612         HRegionInfo r = pair.getFirst();
1613         // check for parents
1614         if (r.isSplitParent()) continue;
1615         // if a split point given, only split that particular region
1616         if (r.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1617            (splitPoint != null && !r.containsRow(splitPoint))) continue;
1618         // call out to region server to do split now
1619         split(pair.getSecond(), pair.getFirst(), splitPoint);
1620       }
1621     } finally {
1622       if (zookeeper != null) {
1623         zookeeper.close();
1624       }
1625     }
1626   }
1627
1628   @Override
1629   public void splitRegion(final byte[] regionName, final byte [] splitPoint) throws IOException {
1630     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
1631     if (regionServerPair == null) {
1632       throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1633     }
1634     if (regionServerPair.getFirst() != null &&
1635         regionServerPair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1636       throw new IllegalArgumentException("Can't split replicas directly. "
1637           + "Replicas are auto-split when their primary is split.");
1638     }
1639     if (regionServerPair.getSecond() == null) {
1640       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1641     }
1642     split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
1643   }
1644
1645   @VisibleForTesting
1646   public void split(final ServerName sn, final HRegionInfo hri,
1647       byte[] splitPoint) throws IOException {
1648     if (hri.getStartKey() != null && splitPoint != null &&
1649          Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
1650        throw new IOException("should not give a splitkey which equals to startkey!");
1651     }
1652     // TODO: There is no timeout on this controller. Set one!
1653     HBaseRpcController controller = rpcControllerFactory.newController();
1654     controller.setPriority(hri.getTable());
1655
1656     // TODO: this does not do retries, it should. Set priority and timeout in controller
1657     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1658     ProtobufUtil.split(controller, admin, hri, splitPoint);
1659   }
1660
1661   @Override
1662   public void modifyTable(final TableName tableName, final HTableDescriptor htd)
1663       throws IOException {
1664     get(modifyTableAsync(tableName, htd), syncWaitTimeout, TimeUnit.MILLISECONDS);
1665   }
1666
1667   @Override
1668   public Future<Void> modifyTableAsync(final TableName tableName, final HTableDescriptor htd)
1669       throws IOException {
1670     if (!tableName.equals(htd.getTableName())) {
1671       throw new IllegalArgumentException("the specified table name '" + tableName +
1672         "' doesn't match with the HTD one: " + htd.getTableName());
1673     }
1674     ModifyTableResponse response = executeCallable(
1675       new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) {
1676         @Override
1677         protected ModifyTableResponse rpcCall() throws Exception {
1678           setPriority(tableName);
1679           ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
1680             tableName, htd, ng.getNonceGroup(), ng.newNonce());
1681           return master.modifyTable(getRpcController(), request);
1682         }
1683       });
1684     return new ModifyTableFuture(this, tableName, response);
1685   }
1686
1687   private static class ModifyTableFuture extends TableFuture<Void> {
1688     public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName,
1689         final ModifyTableResponse response) {
1690       super(admin, tableName,
1691           (response != null && response.hasProcId()) ? response.getProcId() : null);
1692     }
1693
1694     public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
1695       super(admin, tableName, procId);
1696     }
1697
1698     @Override
1699     public String getOperationType() {
1700       return "MODIFY";
1701     }
1702
1703     @Override
1704     protected Void postOperationResult(final Void result, final long deadlineTs)
1705         throws IOException, TimeoutException {
1706       // The modify operation on the table is asynchronous on the server side irrespective
1707       // of whether Procedure V2 is supported or not. So, we wait in the client till
1708       // all regions get updated.
1709       waitForSchemaUpdate(deadlineTs);
1710       return result;
1711     }
1712   }
1713
1714   /**
1715    * @param regionName Name of a region.
1716    * @return a pair of HRegionInfo and ServerName if <code>regionName</code> is
1717    *  a verified region name (we call {@link
1718    *  MetaTableAccessor#getRegionLocation(Connection, byte[])}
1719    *  else null.
1720    * Throw IllegalArgumentException if <code>regionName</code> is null.
1721    * @throws IOException
1722    */
1723   Pair<HRegionInfo, ServerName> getRegion(final byte[] regionName) throws IOException {
1724     if (regionName == null) {
1725       throw new IllegalArgumentException("Pass a table name or region name");
1726     }
1727     Pair<HRegionInfo, ServerName> pair =
1728       MetaTableAccessor.getRegion(connection, regionName);
1729     if (pair == null) {
1730       final AtomicReference<Pair<HRegionInfo, ServerName>> result =
1731         new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
1732       final String encodedName = Bytes.toString(regionName);
1733       MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
1734         @Override
1735         public boolean visit(Result data) throws IOException {
1736           HRegionInfo info = MetaTableAccessor.getHRegionInfo(data);
1737           if (info == null) {
1738             LOG.warn("No serialized HRegionInfo in " + data);
1739             return true;
1740           }
1741           RegionLocations rl = MetaTableAccessor.getRegionLocations(data);
1742           boolean matched = false;
1743           ServerName sn = null;
1744           if (rl != null) {
1745             for (HRegionLocation h : rl.getRegionLocations()) {
1746               if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) {
1747                 sn = h.getServerName();
1748                 info = h.getRegionInfo();
1749                 matched = true;
1750               }
1751             }
1752           }
1753           if (!matched) return true;
1754           result.set(new Pair<HRegionInfo, ServerName>(info, sn));
1755           return false; // found the region, stop
1756         }
1757       };
1758
1759       MetaTableAccessor.fullScanRegions(connection, visitor);
1760       pair = result.get();
1761     }
1762     return pair;
1763   }
1764
1765   /**
1766    * If the input is a region name, it is returned as is. If it's an
1767    * encoded region name, the corresponding region is found from meta
1768    * and its region name is returned. If we can't find any region in
1769    * meta matching the input as either region name or encoded region
1770    * name, the input is returned as is. We don't throw unknown
1771    * region exception.
1772    */
1773   private byte[] getRegionName(
1774       final byte[] regionNameOrEncodedRegionName) throws IOException {
1775     if (Bytes.equals(regionNameOrEncodedRegionName,
1776         HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
1777           || Bytes.equals(regionNameOrEncodedRegionName,
1778             HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
1779       return HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
1780     }
1781     byte[] tmp = regionNameOrEncodedRegionName;
1782     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionNameOrEncodedRegionName);
1783     if (regionServerPair != null && regionServerPair.getFirst() != null) {
1784       tmp = regionServerPair.getFirst().getRegionName();
1785     }
1786     return tmp;
1787   }
1788
1789   /**
1790    * Check if table exists or not
1791    * @param tableName Name of a table.
1792    * @return tableName instance
1793    * @throws IOException if a remote or network exception occurs.
1794    * @throws TableNotFoundException if table does not exist.
1795    */
1796   private TableName checkTableExists(final TableName tableName)
1797       throws IOException {
1798     return executeCallable(new RpcRetryingCallable<TableName>() {
1799       @Override
1800       protected TableName rpcCall(int callTimeout) throws Exception {
1801         if (!MetaTableAccessor.tableExists(connection, tableName)) {
1802           throw new TableNotFoundException(tableName);
1803         }
1804         return tableName;
1805       }
1806     });
1807   }
1808
1809   @Override
1810   public synchronized void shutdown() throws IOException {
1811     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1812       @Override
1813       protected Void rpcCall() throws Exception {
1814         setPriority(HConstants.HIGH_QOS);
1815         master.shutdown(getRpcController(), ShutdownRequest.newBuilder().build());
1816         return null;
1817       }
1818     });
1819   }
1820
1821   @Override
1822   public synchronized void stopMaster() throws IOException {
1823     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1824       @Override
1825       protected Void rpcCall() throws Exception {
1826         setPriority(HConstants.HIGH_QOS);
1827         master.stopMaster(getRpcController(), StopMasterRequest.newBuilder().build());
1828         return null;
1829       }
1830     });
1831   }
1832
1833   @Override
1834   public synchronized void stopRegionServer(final String hostnamePort)
1835   throws IOException {
1836     String hostname = Addressing.parseHostname(hostnamePort);
1837     int port = Addressing.parsePort(hostnamePort);
1838     final AdminService.BlockingInterface admin =
1839       this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
1840     // TODO: There is no timeout on this controller. Set one!
1841     HBaseRpcController controller = rpcControllerFactory.newController();
1842     controller.setPriority(HConstants.HIGH_QOS);
1843     StopServerRequest request = RequestConverter.buildStopServerRequest(
1844         "Called by admin client " + this.connection.toString());
1845     try {
1846       admin.stopServer(controller, request);
1847     } catch (Exception e) {
1848       throw ProtobufUtil.handleRemoteException(e);
1849     }
1850   }
1851
1852   @Override
1853   public boolean isMasterInMaintenanceMode() throws IOException {
1854     return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(),
1855         this.rpcControllerFactory) {
1856       @Override
1857       protected IsInMaintenanceModeResponse rpcCall() throws Exception {
1858         return master.isMasterInMaintenanceMode(getRpcController(),
1859             IsInMaintenanceModeRequest.newBuilder().build());
1860       }
1861     }).getInMaintenanceMode();
1862   }
1863
1864   @Override
1865   public ClusterStatus getClusterStatus() throws IOException {
1866     return executeCallable(new MasterCallable<ClusterStatus>(getConnection(),
1867         this.rpcControllerFactory) {
1868       @Override
1869       protected ClusterStatus rpcCall() throws Exception {
1870         GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
1871         return ProtobufUtil.convert(master.getClusterStatus(getRpcController(), req).
1872             getClusterStatus());
1873       }
1874     });
1875   }
1876
1877   @Override
1878   public Configuration getConfiguration() {
1879     return this.conf;
1880   }
1881
1882   /**
1883    * Do a get with a timeout against the passed in <code>future<code>.
1884    */
1885   private static <T> T get(final Future<T> future, final long timeout, final TimeUnit units)
1886   throws IOException {
1887     try {
1888       // TODO: how long should we wait? Spin forever?
1889       return future.get(timeout, units);
1890     } catch (InterruptedException e) {
1891       throw new InterruptedIOException("Interrupt while waiting on " + future);
1892     } catch (TimeoutException e) {
1893       throw new TimeoutIOException(e);
1894     } catch (ExecutionException e) {
1895       if (e.getCause() instanceof IOException) {
1896         throw (IOException)e.getCause();
1897       } else {
1898         throw new IOException(e.getCause());
1899       }
1900     }
1901   }
1902
1903   @Override
1904   public void createNamespace(final NamespaceDescriptor descriptor)
1905   throws IOException {
1906     get(createNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
1907   }
1908
1909   @Override
1910   public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
1911       throws IOException {
1912     CreateNamespaceResponse response =
1913         executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(),
1914             getRpcControllerFactory()) {
1915       @Override
1916       protected CreateNamespaceResponse rpcCall() throws Exception {
1917         return master.createNamespace(getRpcController(),
1918           CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil.
1919               toProtoNamespaceDescriptor(descriptor)).build());
1920       }
1921     });
1922     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
1923       @Override
1924       public String getOperationType() {
1925         return "CREATE_NAMESPACE";
1926       }
1927     };
1928   }
1929
1930   @Override
1931   public void modifyNamespace(final NamespaceDescriptor descriptor)
1932   throws IOException {
1933     get(modifyNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
1934   }
1935
1936   @Override
1937   public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
1938       throws IOException {
1939     ModifyNamespaceResponse response =
1940         executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(),
1941             getRpcControllerFactory()) {
1942       @Override
1943       protected ModifyNamespaceResponse rpcCall() throws Exception {
1944         // TODO: set priority based on NS?
1945         return master.modifyNamespace(getRpcController(), ModifyNamespaceRequest.newBuilder().
1946           setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
1947        }
1948     });
1949     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
1950       @Override
1951       public String getOperationType() {
1952         return "MODIFY_NAMESPACE";
1953       }
1954     };
1955   }
1956
1957   @Override
1958   public void deleteNamespace(final String name)
1959   throws IOException {
1960     get(deleteNamespaceAsync(name), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
1961   }
1962
1963   @Override
1964   public Future<Void> deleteNamespaceAsync(final String name)
1965       throws IOException {
1966     DeleteNamespaceResponse response =
1967         executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(),
1968             getRpcControllerFactory()) {
1969       @Override
1970       protected DeleteNamespaceResponse rpcCall() throws Exception {
1971         // TODO: set priority based on NS?
1972         return master.deleteNamespace(getRpcController(), DeleteNamespaceRequest.newBuilder().
1973           setNamespaceName(name).build());
1974         }
1975       });
1976     return new NamespaceFuture(this, name, response.getProcId()) {
1977       @Override
1978       public String getOperationType() {
1979         return "DELETE_NAMESPACE";
1980       }
1981     };
1982   }
1983
1984   @Override
1985   public NamespaceDescriptor getNamespaceDescriptor(final String name)
1986       throws NamespaceNotFoundException, IOException {
1987     return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(),
1988         getRpcControllerFactory()) {
1989       @Override
1990       protected NamespaceDescriptor rpcCall() throws Exception {
1991         return ProtobufUtil.toNamespaceDescriptor(
1992             master.getNamespaceDescriptor(getRpcController(),
1993                 GetNamespaceDescriptorRequest.newBuilder().
1994                   setNamespaceName(name).build()).getNamespaceDescriptor());
1995       }
1996     });
1997   }
1998
1999   @Override
2000   public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
2001     return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(),
2002         getRpcControllerFactory()) {
2003       @Override
2004       protected NamespaceDescriptor[] rpcCall() throws Exception {
2005         List<HBaseProtos.NamespaceDescriptor> list =
2006             master.listNamespaceDescriptors(getRpcController(),
2007               ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList();
2008         NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
2009         for(int i = 0; i < list.size(); i++) {
2010           res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
2011         }
2012         return res;
2013       }
2014     });
2015   }
2016
2017   @Override
2018   public ProcedureInfo[] listProcedures() throws IOException {
2019     return executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection(),
2020         getRpcControllerFactory()) {
2021       @Override
2022       protected ProcedureInfo[] rpcCall() throws Exception {
2023         List<ProcedureProtos.Procedure> procList = master.listProcedures(
2024             getRpcController(), ListProceduresRequest.newBuilder().build()).getProcedureList();
2025         ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
2026         for (int i = 0; i < procList.size(); i++) {
2027           procInfoList[i] = ProcedureUtil.convert(procList.get(i));
2028         }
2029         return procInfoList;
2030       }
2031     });
2032   }
2033
2034   @Override
2035   public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
2036     return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
2037         getRpcControllerFactory()) {
2038       @Override
2039       protected HTableDescriptor[] rpcCall() throws Exception {
2040         List<TableSchema> list =
2041             master.listTableDescriptorsByNamespace(getRpcController(),
2042                 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
2043                 .build()).getTableSchemaList();
2044         HTableDescriptor[] res = new HTableDescriptor[list.size()];
2045         for(int i=0; i < list.size(); i++) {
2046
2047           res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
2048         }
2049         return res;
2050       }
2051     });
2052   }
2053
2054   @Override
2055   public TableName[] listTableNamesByNamespace(final String name) throws IOException {
2056     return executeCallable(new MasterCallable<TableName[]>(getConnection(),
2057         getRpcControllerFactory()) {
2058       @Override
2059       protected TableName[] rpcCall() throws Exception {
2060         List<HBaseProtos.TableName> tableNames =
2061             master.listTableNamesByNamespace(getRpcController(), ListTableNamesByNamespaceRequest.
2062                 newBuilder().setNamespaceName(name).build())
2063             .getTableNameList();
2064         TableName[] result = new TableName[tableNames.size()];
2065         for (int i = 0; i < tableNames.size(); i++) {
2066           result[i] = ProtobufUtil.toTableName(tableNames.get(i));
2067         }
2068         return result;
2069       }
2070     });
2071   }
2072
2073   /**
2074    * Check to see if HBase is running. Throw an exception if not.
2075    * @param conf system configuration
2076    * @throws MasterNotRunningException if the master is not running
2077    * @throws ZooKeeperConnectionException if unable to connect to zookeeper
2078    * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't want to have
2079    * protobuf as part of our public API. Use {@link #available(Configuration)}
2080    */
2081   // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
2082   // MOB uses it too.
2083   // NOTE: hbase-2.0.0 removes ServiceException from the throw.
2084   @Deprecated
2085   public static void checkHBaseAvailable(Configuration conf)
2086   throws MasterNotRunningException, ZooKeeperConnectionException, IOException,
2087   com.google.protobuf.ServiceException {
2088     available(conf);
2089   }
2090
2091   /**
2092    * Is HBase available? Throw an exception if not.
2093    * @param conf system configuration
2094    * @throws ZooKeeperConnectionException if unable to connect to zookeeper]
2095    */
2096   public static void available(final Configuration conf)
2097   throws ZooKeeperConnectionException, InterruptedIOException {
2098     Configuration copyOfConf = HBaseConfiguration.create(conf);
2099     // We set it to make it fail as soon as possible if HBase is not available
2100     copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
2101     copyOfConf.setInt("zookeeper.recovery.retry", 0);
2102
2103     // Check ZK first.
2104     // If the connection exists, we may have a connection to ZK that does not work anymore
2105     try (ClusterConnection connection =
2106              (ClusterConnection) ConnectionFactory.createConnection(copyOfConf);
2107          ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection).
2108              getKeepAliveZooKeeperWatcher();) {
2109       // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
2110       zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
2111       connection.isMasterRunning();
2112     } catch (IOException e) {
2113       throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
2114     } catch (InterruptedException e) {
2115       throw (InterruptedIOException)
2116           new InterruptedIOException("Can't connect to ZooKeeper").initCause(e);
2117     } catch (KeeperException e) {
2118       throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
2119     }
2120   }
2121
2122   @Override
2123   public List<HRegionInfo> getTableRegions(final TableName tableName)
2124   throws IOException {
2125     ZooKeeperWatcher zookeeper =
2126       new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
2127         new ThrowableAbortable());
2128     List<HRegionInfo> regions = null;
2129     try {
2130       if (TableName.META_TABLE_NAME.equals(tableName)) {
2131         regions = new MetaTableLocator().getMetaRegions(zookeeper);
2132       } else {
2133         regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
2134       }
2135     } finally {
2136       zookeeper.close();
2137     }
2138     return regions;
2139   }
2140
2141   @Override
2142   public synchronized void close() throws IOException {
2143   }
2144
2145   @Override
2146   public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames)
2147   throws IOException {
2148     return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
2149         getRpcControllerFactory()) {
2150       @Override
2151       protected HTableDescriptor[] rpcCall() throws Exception {
2152         GetTableDescriptorsRequest req =
2153             RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2154           return ProtobufUtil.
2155               getHTableDescriptorArray(master.getTableDescriptors(getRpcController(), req));
2156       }
2157     });
2158   }
2159
2160   /**
2161    * Get tableDescriptor
2162    * @param tableName one table name
2163    * @return HTD the HTableDescriptor or null if the table not exists
2164    * @throws IOException if a remote or network exception occurs
2165    */
2166   private HTableDescriptor getTableDescriptorByTableName(TableName tableName)
2167       throws IOException {
2168     List<TableName> tableNames = new ArrayList<TableName>(1);
2169     tableNames.add(tableName);
2170
2171     HTableDescriptor[] htdl = getTableDescriptorsByTableName(tableNames);
2172
2173     if (htdl == null || htdl.length == 0) {
2174       return null;
2175     }
2176     else {
2177       return htdl[0];
2178     }
2179   }
2180
2181   @Override
2182   public HTableDescriptor[] getTableDescriptors(List<String> names)
2183   throws IOException {
2184     List<TableName> tableNames = new ArrayList<TableName>(names.size());
2185     for(String name : names) {
2186       tableNames.add(TableName.valueOf(name));
2187     }
2188     return getTableDescriptorsByTableName(tableNames);
2189   }
2190
2191   private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
2192       FailedLogCloseException {
2193     final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2194     RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
2195     // TODO: There is no timeout on this controller. Set one!
2196     HBaseRpcController controller = rpcControllerFactory.newController();
2197     try {
2198       return admin.rollWALWriter(controller, request);
2199     } catch (ServiceException e) {
2200       throw ProtobufUtil.handleRemoteException(e);
2201     }
2202   }
2203
2204   /**
2205    * Roll the log writer. I.e. when using a file system based write ahead log,
2206    * start writing log messages to a new file.
2207    *
2208    * Note that when talking to a version 1.0+ HBase deployment, the rolling is asynchronous.
2209    * This method will return as soon as the roll is requested and the return value will
2210    * always be null. Additionally, the named region server may schedule store flushes at the
2211    * request of the wal handling the roll request.
2212    *
2213    * When talking to a 0.98 or older HBase deployment, the rolling is synchronous and the
2214    * return value may be either null or a list of encoded region names.
2215    *
2216    * @param serverName
2217    *          The servername of the regionserver. A server name is made of host,
2218    *          port and startcode. This is mandatory. Here is an example:
2219    *          <code> host187.example.com,60020,1289493121758</code>
2220    * @return a set of {@link HRegionInfo#getEncodedName()} that would allow the wal to
2221    *         clean up some underlying files. null if there's nothing to flush.
2222    * @throws IOException if a remote or network exception occurs
2223    * @throws FailedLogCloseException
2224    * @deprecated use {@link #rollWALWriter(ServerName)}
2225    */
2226   @Deprecated
2227   public synchronized byte[][] rollHLogWriter(String serverName)
2228       throws IOException, FailedLogCloseException {
2229     ServerName sn = ServerName.valueOf(serverName);
2230     final RollWALWriterResponse response = rollWALWriterImpl(sn);
2231     int regionCount = response.getRegionToFlushCount();
2232     if (0 == regionCount) {
2233       return null;
2234     }
2235     byte[][] regionsToFlush = new byte[regionCount][];
2236     for (int i = 0; i < regionCount; i++) {
2237       regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i));
2238     }
2239     return regionsToFlush;
2240   }
2241
2242   @Override
2243   public synchronized void rollWALWriter(ServerName serverName)
2244       throws IOException, FailedLogCloseException {
2245     rollWALWriterImpl(serverName);
2246   }
2247
2248   @Override
2249   public String[] getMasterCoprocessors() {
2250     try {
2251       return getClusterStatus().getMasterCoprocessors();
2252     } catch (IOException e) {
2253       LOG.error("Could not getClusterStatus()",e);
2254       return null;
2255     }
2256   }
2257
2258   @Override
2259   public CompactionState getCompactionState(final TableName tableName)
2260   throws IOException {
2261     return getCompactionState(tableName, CompactType.NORMAL);
2262   }
2263
2264   @Override
2265   public CompactionState getCompactionStateForRegion(final byte[] regionName)
2266   throws IOException {
2267     final Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
2268     if (regionServerPair == null) {
2269       throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
2270     }
2271     if (regionServerPair.getSecond() == null) {
2272       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
2273     }
2274     ServerName sn = regionServerPair.getSecond();
2275     final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2276     // TODO: There is no timeout on this controller. Set one!
2277     HBaseRpcController controller = rpcControllerFactory.newController();
2278     GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
2279       regionServerPair.getFirst().getRegionName(), true);
2280     GetRegionInfoResponse response;
2281     try {
2282       response = admin.getRegionInfo(controller, request);
2283     } catch (ServiceException e) {
2284       throw ProtobufUtil.handleRemoteException(e);
2285     }
2286     if (response.getCompactionState() != null) {
2287       return ProtobufUtil.createCompactionState(response.getCompactionState());
2288     }
2289     return null;
2290   }
2291
2292   @Override
2293   public void snapshot(final String snapshotName,
2294                        final TableName tableName) throws IOException,
2295       SnapshotCreationException, IllegalArgumentException {
2296     snapshot(snapshotName, tableName, SnapshotType.FLUSH);
2297   }
2298
2299   @Override
2300   public void snapshot(final byte[] snapshotName, final TableName tableName)
2301       throws IOException, SnapshotCreationException, IllegalArgumentException {
2302     snapshot(Bytes.toString(snapshotName), tableName, SnapshotType.FLUSH);
2303   }
2304
2305   @Override
2306   public void snapshot(final String snapshotName, final TableName tableName,
2307       SnapshotType type)
2308       throws IOException, SnapshotCreationException, IllegalArgumentException {
2309     snapshot(new SnapshotDescription(snapshotName, tableName.getNameAsString(), type));
2310   }
2311
2312   @Override
2313   public void snapshot(SnapshotDescription snapshotDesc)
2314       throws IOException, SnapshotCreationException, IllegalArgumentException {
2315     // actually take the snapshot
2316     HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
2317     SnapshotResponse response = asyncSnapshot(snapshot);
2318     final IsSnapshotDoneRequest request =
2319         IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build();
2320     IsSnapshotDoneResponse done = null;
2321     long start = EnvironmentEdgeManager.currentTime();
2322     long max = response.getExpectedTimeout();
2323     long maxPauseTime = max / this.numRetries;
2324     int tries = 0;
2325     LOG.debug("Waiting a max of " + max + " ms for snapshot '" +
2326         ClientSnapshotDescriptionUtils.toString(snapshot) + "'' to complete. (max " +
2327         maxPauseTime + " ms per retry)");
2328     while (tries == 0
2329         || ((EnvironmentEdgeManager.currentTime() - start) < max && !done.getDone())) {
2330       try {
2331         // sleep a backoff <= pauseTime amount
2332         long sleep = getPauseTime(tries++);
2333         sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2334         LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2335           "ms while waiting for snapshot completion.");
2336         Thread.sleep(sleep);
2337       } catch (InterruptedException e) {
2338         throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2339       }
2340       LOG.debug("Getting current status of snapshot from master...");
2341       done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
2342           getRpcControllerFactory()) {
2343         @Override
2344         protected IsSnapshotDoneResponse rpcCall() throws Exception {
2345           return master.isSnapshotDone(getRpcController(), request);
2346         }
2347       });
2348     }
2349     if (!done.getDone()) {
2350       throw new SnapshotCreationException("Snapshot '" + snapshot.getName()
2351           + "' wasn't completed in expectedTime:" + max + " ms", snapshotDesc);
2352     }
2353   }
2354
2355   @Override
2356   public void takeSnapshotAsync(SnapshotDescription snapshotDesc) throws IOException,
2357       SnapshotCreationException {
2358     HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
2359     asyncSnapshot(snapshot);
2360   }
2361
2362   private HBaseProtos.SnapshotDescription
2363       createHBaseProtosSnapshotDesc(SnapshotDescription snapshotDesc) {
2364     HBaseProtos.SnapshotDescription.Builder builder = HBaseProtos.SnapshotDescription.newBuilder();
2365     if (snapshotDesc.getTable() != null) {
2366       builder.setTable(snapshotDesc.getTable());
2367     }
2368     if (snapshotDesc.getName() != null) {
2369       builder.setName(snapshotDesc.getName());
2370     }
2371     if (snapshotDesc.getOwner() != null) {
2372       builder.setOwner(snapshotDesc.getOwner());
2373     }
2374     if (snapshotDesc.getCreationTime() != -1) {
2375       builder.setCreationTime(snapshotDesc.getCreationTime());
2376     }
2377     if (snapshotDesc.getVersion() != -1) {
2378       builder.setVersion(snapshotDesc.getVersion());
2379     }
2380     builder.setType(ProtobufUtil.createProtosSnapShotDescType(snapshotDesc.getType()));
2381     HBaseProtos.SnapshotDescription snapshot = builder.build();
2382     return snapshot;
2383   }
2384
2385   private SnapshotResponse asyncSnapshot(HBaseProtos.SnapshotDescription snapshot)
2386       throws IOException {
2387     ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2388     final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2389         .build();
2390     // run the snapshot on the master
2391     return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(),
2392         getRpcControllerFactory()) {
2393       @Override
2394       protected SnapshotResponse rpcCall() throws Exception {
2395         return master.snapshot(getRpcController(), request);
2396       }
2397     });
2398   }
2399
2400   @Override
2401   public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
2402       throws IOException, HBaseSnapshotException, UnknownSnapshotException {
2403     final HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
2404     return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
2405         getRpcControllerFactory()) {
2406       @Override
2407       protected IsSnapshotDoneResponse rpcCall() throws Exception {
2408         return master.isSnapshotDone(getRpcController(),
2409           IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
2410       }
2411     }).getDone();
2412   }
2413
2414   @Override
2415   public void restoreSnapshot(final byte[] snapshotName)
2416       throws IOException, RestoreSnapshotException {
2417     restoreSnapshot(Bytes.toString(snapshotName));
2418   }
2419
2420   @Override
2421   public void restoreSnapshot(final String snapshotName)
2422       throws IOException, RestoreSnapshotException {
2423     boolean takeFailSafeSnapshot =
2424       conf.getBoolean("hbase.snapshot.restore.take.failsafe.snapshot", false);
2425     restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2426   }
2427
2428   @Override
2429   public void restoreSnapshot(final byte[] snapshotName, final boolean takeFailSafeSnapshot)
2430       throws IOException, RestoreSnapshotException {
2431     restoreSnapshot(Bytes.toString(snapshotName), takeFailSafeSnapshot);
2432   }
2433
2434   /*
2435    * Check whether the snapshot exists and contains disabled table
2436    *
2437    * @param snapshotName name of the snapshot to restore
2438    * @throws IOException if a remote or network exception occurs
2439    * @throws RestoreSnapshotException if no valid snapshot is found
2440    */
2441   private TableName getTableNameBeforeRestoreSnapshot(final String snapshotName)
2442       throws IOException, RestoreSnapshotException {
2443     TableName tableName = null;
2444     for (SnapshotDescription snapshotInfo: listSnapshots()) {
2445       if (snapshotInfo.getName().equals(snapshotName)) {
2446         tableName = TableName.valueOf(snapshotInfo.getTable());
2447         break;
2448       }
2449     }
2450
2451     if (tableName == null) {
2452       throw new RestoreSnapshotException(
2453         "Unable to find the table name for snapshot=" + snapshotName);
2454     }
2455     return tableName;
2456   }
2457
2458   @Override
2459   public void restoreSnapshot(final String snapshotName, final boolean takeFailSafeSnapshot)
2460       throws IOException, RestoreSnapshotException {
2461     TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2462
2463     // The table does not exists, switch to clone.
2464     if (!tableExists(tableName)) {
2465       cloneSnapshot(snapshotName, tableName);
2466       return;
2467     }
2468
2469     // Check if the table is disabled
2470     if (!isTableDisabled(tableName)) {
2471       throw new TableNotDisabledException(tableName);
2472     }
2473
2474     // Take a snapshot of the current state
2475     String failSafeSnapshotSnapshotName = null;
2476     if (takeFailSafeSnapshot) {
2477       failSafeSnapshotSnapshotName = conf.get("hbase.snapshot.restore.failsafe.name",
2478         "hbase-failsafe-{snapshot.name}-{restore.timestamp}");
2479       failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotName
2480         .replace("{snapshot.name}", snapshotName)
2481         .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2482         .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2483       LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2484       snapshot(failSafeSnapshotSnapshotName, tableName);
2485     }
2486
2487     try {
2488       // Restore snapshot
2489       get(
2490         internalRestoreSnapshotAsync(snapshotName, tableName),
2491         syncWaitTimeout,
2492         TimeUnit.MILLISECONDS);
2493     } catch (IOException e) {
2494       // Somthing went wrong during the restore...
2495       // if the pre-restore snapshot is available try to rollback
2496       if (takeFailSafeSnapshot) {
2497         try {
2498           get(
2499             internalRestoreSnapshotAsync(failSafeSnapshotSnapshotName, tableName),
2500             syncWaitTimeout,
2501             TimeUnit.MILLISECONDS);
2502           String msg = "Restore snapshot=" + snapshotName +
2503             " failed. Rollback to snapshot=" + failSafeSnapshotSnapshotName + " succeeded.";
2504           LOG.error(msg, e);
2505           throw new RestoreSnapshotException(msg, e);
2506         } catch (IOException ex) {
2507           String msg = "Failed to restore and rollback to snapshot=" + failSafeSnapshotSnapshotName;
2508           LOG.error(msg, ex);
2509           throw new RestoreSnapshotException(msg, e);
2510         }
2511       } else {
2512         throw new RestoreSnapshotException("Failed to restore snapshot=" + snapshotName, e);
2513       }
2514     }
2515
2516     // If the restore is succeeded, delete the pre-restore snapshot
2517     if (takeFailSafeSnapshot) {
2518       try {
2519         LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2520         deleteSnapshot(failSafeSnapshotSnapshotName);
2521       } catch (IOException e) {
2522         LOG.error("Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, e);
2523       }
2524     }
2525   }
2526
2527   @Override
2528   public Future<Void> restoreSnapshotAsync(final String snapshotName)
2529       throws IOException, RestoreSnapshotException {
2530     TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2531
2532     // The table does not exists, switch to clone.
2533     if (!tableExists(tableName)) {
2534       return cloneSnapshotAsync(snapshotName, tableName);
2535     }
2536
2537     // Check if the table is disabled
2538     if (!isTableDisabled(tableName)) {
2539       throw new TableNotDisabledException(tableName);
2540     }
2541
2542     return internalRestoreSnapshotAsync(snapshotName, tableName);
2543   }
2544
2545   @Override
2546   public void cloneSnapshot(final byte[] snapshotName, final TableName tableName)
2547       throws IOException, TableExistsException, RestoreSnapshotException {
2548     cloneSnapshot(Bytes.toString(snapshotName), tableName);
2549   }
2550
2551   @Override
2552   public void cloneSnapshot(final String snapshotName, final TableName tableName)
2553       throws IOException, TableExistsException, RestoreSnapshotException {
2554     if (tableExists(tableName)) {
2555       throw new TableExistsException(tableName);
2556     }
2557     get(
2558       internalRestoreSnapshotAsync(snapshotName, tableName),
2559       Integer.MAX_VALUE,
2560       TimeUnit.MILLISECONDS);
2561   }
2562
2563   @Override
2564   public Future<Void> cloneSnapshotAsync(final String snapshotName, final TableName tableName)
2565       throws IOException, TableExistsException {
2566     if (tableExists(tableName)) {
2567       throw new TableExistsException(tableName);
2568     }
2569     return internalRestoreSnapshotAsync(snapshotName, tableName);
2570   }
2571
2572   @Override
2573   public byte[] execProcedureWithRet(String signature, String instance, Map<String, String> props)
2574       throws IOException {
2575     ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
2576     builder.setSignature(signature).setInstance(instance);
2577     for (Entry<String, String> entry : props.entrySet()) {
2578       NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
2579           .setValue(entry.getValue()).build();
2580       builder.addConfiguration(pair);
2581     }
2582
2583     final ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
2584         .setProcedure(builder.build()).build();
2585     // run the procedure on the master
2586     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
2587         getConnection(), getRpcControllerFactory()) {
2588       @Override
2589       protected ExecProcedureResponse rpcCall() throws Exception {
2590         return master.execProcedureWithRet(getRpcController(), request);
2591       }
2592     });
2593
2594     return response.hasReturnData() ? response.getReturnData().toByteArray() : null;
2595   }
2596
2597   @Override
2598   public void execProcedure(String signature, String instance, Map<String, String> props)
2599       throws IOException {
2600     ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
2601     builder.setSignature(signature).setInstance(instance);
2602     for (Entry<String, String> entry : props.entrySet()) {
2603       NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
2604           .setValue(entry.getValue()).build();
2605       builder.addConfiguration(pair);
2606     }
2607
2608     final ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
2609         .setProcedure(builder.build()).build();
2610     // run the procedure on the master
2611     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
2612         getConnection(), getRpcControllerFactory()) {
2613       @Override
2614       protected ExecProcedureResponse rpcCall() throws Exception {
2615         return master.execProcedure(getRpcController(), request);
2616       }
2617     });
2618
2619     long start = EnvironmentEdgeManager.currentTime();
2620     long max = response.getExpectedTimeout();
2621     long maxPauseTime = max / this.numRetries;
2622     int tries = 0;
2623     LOG.debug("Waiting a max of " + max + " ms for procedure '" +
2624         signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
2625     boolean done = false;
2626     while (tries == 0
2627         || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
2628       try {
2629         // sleep a backoff <= pauseTime amount
2630         long sleep = getPauseTime(tries++);
2631         sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2632         LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2633           "ms while waiting for procedure completion.");
2634         Thread.sleep(sleep);
2635       } catch (InterruptedException e) {
2636         throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2637       }
2638       LOG.debug("Getting current status of procedure from master...");
2639       done = isProcedureFinished(signature, instance, props);
2640     }
2641     if (!done) {
2642       throw new IOException("Procedure '" + signature + " : " + instance
2643           + "' wasn't completed in expectedTime:" + max + " ms");
2644     }
2645   }
2646
2647   @Override
2648   public boolean isProcedureFinished(String signature, String instance, Map<String, String> props)
2649       throws IOException {
2650     final ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
2651     builder.setSignature(signature).setInstance(instance);
2652     for (Entry<String, String> entry : props.entrySet()) {
2653       NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
2654           .setValue(entry.getValue()).build();
2655       builder.addConfiguration(pair);
2656     }
2657     final ProcedureDescription desc = builder.build();
2658     return executeCallable(
2659         new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
2660           @Override
2661           protected IsProcedureDoneResponse rpcCall() throws Exception {
2662             return master.isProcedureDone(getRpcController(), IsProcedureDoneRequest
2663                 .newBuilder().setProcedure(desc).build());
2664           }
2665         }).getDone();
2666   }
2667
2668   /**
2669    * Execute Restore/Clone snapshot and wait for the server to complete (blocking).
2670    * To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to
2671    * create an HTable instance to this table before it is available.
2672    * @param snapshotName snapshot to restore
2673    * @param tableName table name to restore the snapshot on
2674    * @throws IOException if a remote or network exception occurs
2675    * @throws RestoreSnapshotException if snapshot failed to be restored
2676    * @throws IllegalArgumentException if the restore request is formatted incorrectly
2677    */
2678   private Future<Void> internalRestoreSnapshotAsync(
2679       final String snapshotName,
2680       final TableName tableName) throws IOException, RestoreSnapshotException {
2681     final HBaseProtos.SnapshotDescription snapshot = HBaseProtos.SnapshotDescription.newBuilder()
2682         .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2683
2684     // actually restore the snapshot
2685     ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2686
2687     RestoreSnapshotResponse response = executeCallable(
2688         new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) {
2689       @Override
2690       protected RestoreSnapshotResponse rpcCall() throws Exception {
2691         final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
2692             .setSnapshot(snapshot)
2693             .setNonceGroup(ng.getNonceGroup())
2694             .setNonce(ng.newNonce())
2695             .build();
2696         return master.restoreSnapshot(getRpcController(), request);
2697       }
2698     });
2699
2700     return new RestoreSnapshotFuture(
2701       this, snapshot, TableName.valueOf(snapshot.getTable()), response);
2702   }
2703
2704   private static class RestoreSnapshotFuture extends TableFuture<Void> {
2705     public RestoreSnapshotFuture(
2706         final HBaseAdmin admin,
2707         final HBaseProtos.SnapshotDescription snapshot,
2708         final TableName tableName,
2709         final RestoreSnapshotResponse response) {
2710       super(admin, tableName,
2711           (response != null && response.hasProcId()) ? response.getProcId() : null);
2712
2713       if (response != null && !response.hasProcId()) {
2714         throw new UnsupportedOperationException("Client could not call old version of Server");
2715       }
2716     }
2717
2718     public RestoreSnapshotFuture(
2719         final HBaseAdmin admin,
2720         final TableName tableName,
2721         final Long procId) {
2722       super(admin, tableName, procId);
2723     }
2724
2725     @Override
2726     public String getOperationType() {
2727       return "MODIFY";
2728     }
2729   }
2730
2731   @Override
2732   public List<SnapshotDescription> listSnapshots() throws IOException {
2733     return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(),
2734         getRpcControllerFactory()) {
2735       @Override
2736       protected List<SnapshotDescription> rpcCall() throws Exception {
2737         List<HBaseProtos.SnapshotDescription> snapshotsList = master
2738             .getCompletedSnapshots(getRpcController(),
2739                 GetCompletedSnapshotsRequest.newBuilder().build())
2740             .getSnapshotsList();
2741         List<SnapshotDescription> result = new ArrayList<SnapshotDescription>(snapshotsList.size());
2742         for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) {
2743           result.add(new SnapshotDescription(snapshot.getName(), snapshot.getTable(),
2744               ProtobufUtil.createSnapshotType(snapshot.getType()), snapshot.getOwner(),
2745               snapshot.getCreationTime(), snapshot.getVersion()));
2746         }
2747         return result;
2748       }
2749     });
2750   }
2751
2752   @Override
2753   public List<SnapshotDescription> listSnapshots(String regex) throws IOException {
2754     return listSnapshots(Pattern.compile(regex));
2755   }
2756
2757   @Override
2758   public List<SnapshotDescription> listSnapshots(Pattern pattern) throws IOException {
2759     List<SnapshotDescription> matched = new LinkedList<SnapshotDescription>();
2760     List<SnapshotDescription> snapshots = listSnapshots();
2761     for (SnapshotDescription snapshot : snapshots) {
2762       if (pattern.matcher(snapshot.getName()).matches()) {
2763         matched.add(snapshot);
2764       }
2765     }
2766     return matched;
2767   }
2768
2769   @Override
2770   public List<SnapshotDescription> listTableSnapshots(String tableNameRegex,
2771       String snapshotNameRegex) throws IOException {
2772     return listTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
2773   }
2774
2775   @Override
2776   public List<SnapshotDescription> listTableSnapshots(Pattern tableNamePattern,
2777       Pattern snapshotNamePattern) throws IOException {
2778     TableName[] tableNames = listTableNames(tableNamePattern);
2779
2780     List<SnapshotDescription> tableSnapshots = new LinkedList<SnapshotDescription>();
2781     List<SnapshotDescription> snapshots = listSnapshots(snapshotNamePattern);
2782
2783     List<TableName> listOfTableNames = Arrays.asList(tableNames);
2784     for (SnapshotDescription snapshot : snapshots) {
2785       if (listOfTableNames.contains(TableName.valueOf(snapshot.getTable()))) {
2786         tableSnapshots.add(snapshot);
2787       }
2788     }
2789     return tableSnapshots;
2790   }
2791
2792   @Override
2793   public void deleteSnapshot(final byte[] snapshotName) throws IOException {
2794     deleteSnapshot(Bytes.toString(snapshotName));
2795   }
2796
2797   @Override
2798   public void deleteSnapshot(final String snapshotName) throws IOException {
2799     // make sure the snapshot is possibly valid
2800     TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
2801     // do the delete
2802     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2803       @Override
2804       protected Void rpcCall() throws Exception {
2805         master.deleteSnapshot(getRpcController(),
2806           DeleteSnapshotRequest.newBuilder().setSnapshot(
2807                 HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
2808               .build()
2809         );
2810         return null;
2811       }
2812     });
2813   }
2814
2815   @Override
2816   public void deleteSnapshots(final String regex) throws IOException {
2817     deleteSnapshots(Pattern.compile(regex));
2818   }
2819
2820   @Override
2821   public void deleteSnapshots(final Pattern pattern) throws IOException {
2822     List<SnapshotDescription> snapshots = listSnapshots(pattern);
2823     for (final SnapshotDescription snapshot : snapshots) {
2824       try {
2825         internalDeleteSnapshot(snapshot);
2826       } catch (IOException ex) {
2827         LOG.info(
2828           "Failed to delete snapshot " + snapshot.getName() + " for table " + snapshot.getTable(),
2829           ex);
2830       }
2831     }
2832   }
2833
2834   private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
2835     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2836       @Override
2837       protected Void rpcCall() throws Exception {
2838         this.master.deleteSnapshot(getRpcController(), DeleteSnapshotRequest.newBuilder()
2839           .setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build());
2840         return null;
2841       }
2842     });
2843   }
2844
2845   @Override
2846   public void deleteTableSnapshots(String tableNameRegex, String snapshotNameRegex)
2847       throws IOException {
2848     deleteTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
2849   }
2850
2851   @Override
2852   public void deleteTableSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern)
2853       throws IOException {
2854     List<SnapshotDescription> snapshots = listTableSnapshots(tableNamePattern, snapshotNamePattern);
2855     for (SnapshotDescription snapshot : snapshots) {
2856       try {
2857         internalDeleteSnapshot(snapshot);
2858         LOG.debug("Successfully deleted snapshot: " + snapshot.getName());
2859       } catch (IOException e) {
2860         LOG.error("Failed to delete snapshot: " + snapshot.getName(), e);
2861       }
2862     }
2863   }
2864
2865   @Override
2866   public void setQuota(final QuotaSettings quota) throws IOException {
2867     executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2868       @Override
2869       protected Void rpcCall() throws Exception {
2870         this.master.setQuota(getRpcController(), QuotaSettings.buildSetQuotaRequestProto(quota));
2871         return null;
2872       }
2873     });
2874   }
2875
2876   @Override
2877   public QuotaRetriever getQuotaRetriever(final QuotaFilter filter) throws IOException {
2878     return QuotaRetriever.open(conf, filter);
2879   }
2880
2881   private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable)
2882       throws IOException {
2883     return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout);
2884   }
2885
2886   static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
2887              RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
2888   throws IOException {
2889     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
2890     try {
2891       return caller.callWithRetries(callable, operationTimeout);
2892     } finally {
2893       callable.close();
2894     }
2895   }
2896
2897   @Override
2898   public CoprocessorRpcChannel coprocessorService() {
2899     return new MasterCoprocessorRpcChannel(connection);
2900   }
2901
2902   /**
2903    * Simple {@link Abortable}, throwing RuntimeException on abort.
2904    */
2905   private static class ThrowableAbortable implements Abortable {
2906     @Override
2907     public void abort(String why, Throwable e) {
2908       throw new RuntimeException(why, e);
2909     }
2910
2911     @Override
2912     public boolean isAborted() {
2913       return true;
2914     }
2915   }
2916
2917   @Override
2918   public CoprocessorRpcChannel coprocessorService(ServerName sn) {
2919     return new RegionServerCoprocessorRpcChannel(connection, sn);
2920   }
2921
2922   @Override
2923   public void updateConfiguration(final ServerName server) throws IOException {
2924     final AdminService.BlockingInterface admin = this.connection.getAdmin(server);
2925     Callable<Void> callable = new Callable<Void>() {
2926       @Override
2927       public Void call() throws Exception {
2928         admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance());
2929         return null;
2930       }
2931     };
2932     ProtobufUtil.call(callable);
2933   }
2934
2935   @Override
2936   public void updateConfiguration() throws IOException {
2937     for (ServerName server : this.getClusterStatus().getServers()) {
2938       updateConfiguration(server);
2939     }
2940   }
2941
2942   @Override
2943   public int getMasterInfoPort() throws IOException {
2944     // TODO: Fix!  Reaching into internal implementation!!!!
2945     ConnectionImplementation connection = (ConnectionImplementation)this.connection;
2946     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
2947     try {
2948       return MasterAddressTracker.getMasterInfoPort(zkw);
2949     } catch (KeeperException e) {
2950       throw new IOException("Failed to get master info port from MasterAddressTracker", e);
2951     }
2952   }
2953
2954   private ServerName getMasterAddress() throws IOException {
2955     // TODO: Fix!  Reaching into internal implementation!!!!
2956     ConnectionImplementation connection = (ConnectionImplementation)this.connection;
2957     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
2958     try {
2959       return MasterAddressTracker.getMasterAddress(zkw);
2960     } catch (KeeperException e) {
2961       throw new IOException("Failed to get master server name from MasterAddressTracker", e);
2962     }
2963   }
2964
2965   @Override
2966   public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
2967     return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
2968       @Override
2969       protected Long rpcCall() throws Exception {
2970         MajorCompactionTimestampRequest req =
2971             MajorCompactionTimestampRequest.newBuilder()
2972                 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
2973         return master.getLastMajorCompactionTimestamp(getRpcController(), req).
2974             getCompactionTimestamp();
2975       }
2976     });
2977   }
2978
2979   @Override
2980   public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
2981     return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
2982       @Override
2983       protected Long rpcCall() throws Exception {
2984         MajorCompactionTimestampForRegionRequest req =
2985             MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter
2986                       .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
2987         return master.getLastMajorCompactionTimestampForRegion(getRpcController(), req)
2988             .getCompactionTimestamp();
2989       }
2990     });
2991   }
2992
2993   /**
2994    * {@inheritDoc}
2995    */
2996   @Override
2997   public void compact(final TableName tableName, final byte[] columnFamily, CompactType compactType)
2998     throws IOException, InterruptedException {
2999     compact(tableName, columnFamily, false, compactType);
3000   }
3001
3002   /**
3003    * {@inheritDoc}
3004    */
3005   @Override
3006   public void compact(final TableName tableName, CompactType compactType)
3007     throws IOException, InterruptedException {
3008     compact(tableName, null, false, compactType);
3009   }
3010
3011   /**
3012    * {@inheritDoc}
3013    */
3014   @Override
3015   public void majorCompact(final TableName tableName, final byte[] columnFamily,
3016     CompactType compactType) throws IOException, InterruptedException {
3017     compact(tableName, columnFamily, true, compactType);
3018   }
3019
3020   /**
3021    * {@inheritDoc}
3022    */
3023   @Override
3024   public void majorCompact(final TableName tableName, CompactType compactType)
3025           throws IOException, InterruptedException {
3026     compact(tableName, null, true, compactType);
3027   }
3028
3029   /**
3030    * {@inheritDoc}
3031    */
3032   @Override
3033   public CompactionState getCompactionState(final TableName tableName,
3034     CompactType compactType) throws IOException {
3035     AdminProtos.GetRegionInfoResponse.CompactionState state =
3036         AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
3037     checkTableExists(tableName);
3038     // TODO: There is no timeout on this controller. Set one!
3039     final HBaseRpcController rpcController = rpcControllerFactory.newController();
3040     switch (compactType) {
3041       case MOB:
3042         final AdminProtos.AdminService.BlockingInterface masterAdmin =
3043           this.connection.getAdmin(getMasterAddress());
3044         Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
3045             new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
3046           @Override
3047           public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
3048             HRegionInfo info = getMobRegionInfo(tableName);
3049             GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
3050                 info.getRegionName(), true);
3051             GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
3052             return response.getCompactionState();
3053           }
3054         };
3055         state = ProtobufUtil.call(callable);
3056         break;
3057       case NORMAL:
3058       default:
3059         ZooKeeperWatcher zookeeper = null;
3060         try {
3061           List<Pair<HRegionInfo, ServerName>> pairs;
3062           if (TableName.META_TABLE_NAME.equals(tableName)) {
3063             zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
3064               new ThrowableAbortable());
3065             pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
3066           } else {
3067             pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
3068           }
3069           for (Pair<HRegionInfo, ServerName> pair: pairs) {
3070             if (pair.getFirst().isOffline()) continue;
3071             if (pair.getSecond() == null) continue;
3072             final ServerName sn = pair.getSecond();
3073             final byte [] regionName = pair.getFirst().getRegionName();
3074             final AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
3075             try {
3076               Callable<GetRegionInfoResponse> regionInfoCallable =
3077                   new Callable<GetRegionInfoResponse>() {
3078                 @Override
3079                 public GetRegionInfoResponse call() throws Exception {
3080                   GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
3081                       regionName, true);
3082                   return snAdmin.getRegionInfo(rpcController, request);
3083                 }
3084               };
3085               GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
3086               switch (response.getCompactionState()) {
3087                 case MAJOR_AND_MINOR:
3088                   return CompactionState.MAJOR_AND_MINOR;
3089                 case MAJOR:
3090                   if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MINOR) {
3091                     return CompactionState.MAJOR_AND_MINOR;
3092                   }
3093                   state = AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR;
3094                   break;
3095                 case MINOR:
3096                   if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) {
3097                     return CompactionState.MAJOR_AND_MINOR;
3098                   }
3099                   state = AdminProtos.GetRegionInfoResponse.CompactionState.MINOR;
3100                   break;
3101                 case NONE:
3102                 default: // nothing, continue
3103               }
3104             } catch (NotServingRegionException e) {
3105               if (LOG.isDebugEnabled()) {
3106                 LOG.debug("Trying to get compaction state of " +
3107                         pair.getFirst() + ": " +
3108                         StringUtils.stringifyException(e));
3109               }
3110             } catch (RemoteException e) {
3111               if (e.getMessage().indexOf(NotServingRegionException.class.getName()) >= 0) {
3112                 if (LOG.isDebugEnabled()) {
3113                   LOG.debug("Trying to get compaction state of " + pair.getFirst() + ": "
3114                           + StringUtils.stringifyException(e));
3115                 }
3116               } else {
3117                 throw e;
3118               }
3119             }
3120           }
3121         } finally {
3122           if (zookeeper != null) {
3123             zookeeper.close();
3124           }
3125         }
3126         break;
3127     }
3128     if(state != null) {
3129       return ProtobufUtil.createCompactionState(state);
3130     }
3131     return null;
3132   }
3133
3134   /**
3135    * Future that waits on a procedure result.
3136    * Returned by the async version of the Admin calls,
3137    * and used internally by the sync calls to wait on the result of the procedure.
3138    */
3139   @InterfaceAudience.Private
3140   @InterfaceStability.Evolving
3141   protected static class ProcedureFuture<V> implements Future<V> {
3142     private ExecutionException exception = null;
3143     private boolean procResultFound = false;
3144     private boolean done = false;
3145     private boolean cancelled = false;
3146     private V result = null;
3147
3148     private final HBaseAdmin admin;
3149     private final Long procId;
3150
3151     public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
3152       this.admin = admin;
3153       this.procId = procId;
3154     }
3155
3156     @Override
3157     public boolean cancel(boolean mayInterruptIfRunning) {
3158       AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder()
3159           .setProcId(procId).setMayInterruptIfRunning(mayInterruptIfRunning).build();
3160       try {
3161         cancelled = abortProcedureResult(abortProcRequest).getIsProcedureAborted();
3162         if (cancelled) {
3163           done = true;
3164         }
3165       } catch (IOException e) {
3166         // Cancell thrown exception for some reason. At this time, we are not sure whether
3167         // the cancell succeeds or fails. We assume that it is failed, but print out a warning
3168         // for debugging purpose.
3169         LOG.warn(
3170           "Cancelling the procedure with procId=" + procId + " throws exception " + e.getMessage(),
3171           e);
3172         cancelled = false;
3173       }
3174       return cancelled;
3175     }
3176
3177     @Override
3178     public boolean isCancelled() {
3179       return cancelled;
3180     }
3181
3182     protected AbortProcedureResponse abortProcedureResult(
3183         final AbortProcedureRequest request) throws IOException {
3184       return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
3185           admin.getConnection(), admin.getRpcControllerFactory()) {
3186         @Override
3187         protected AbortProcedureResponse rpcCall() throws Exception {
3188           return master.abortProcedure(getRpcController(), request);
3189         }
3190       });
3191     }
3192
3193     @Override
3194     public V get() throws InterruptedException, ExecutionException {
3195       // TODO: should we ever spin forever?
3196       throw new UnsupportedOperationException();
3197     }
3198
3199     @Override
3200     public V get(long timeout, TimeUnit unit)
3201         throws InterruptedException, ExecutionException, TimeoutException {
3202       if (!done) {
3203         long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
3204         try {
3205           try {
3206             // if the master support procedures, try to wait the result
3207             if (procId != null) {
3208               result = waitProcedureResult(procId, deadlineTs);
3209             }
3210             // if we don't have a proc result, try the compatibility wait
3211             if (!procResultFound) {
3212               result = waitOperationResult(deadlineTs);
3213             }
3214             result = postOperationResult(result, deadlineTs);
3215             done = true;
3216           } catch (IOException e) {
3217             result = postOperationFailure(e, deadlineTs);
3218             done = true;
3219           }
3220         } catch (IOException e) {
3221           exception = new ExecutionException(e);
3222           done = true;
3223         }
3224       }
3225       if (exception != null) {
3226         throw exception;
3227       }
3228       return result;
3229     }
3230
3231     @Override
3232     public boolean isDone() {
3233       return done;
3234     }
3235
3236     protected HBaseAdmin getAdmin() {
3237       return admin;
3238     }
3239
3240     private V waitProcedureResult(long procId, long deadlineTs)
3241         throws IOException, TimeoutException, InterruptedException {
3242       GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
3243           .setProcId(procId)
3244           .build();
3245
3246       int tries = 0;
3247       IOException serviceEx = null;
3248       while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3249         GetProcedureResultResponse response = null;
3250         try {
3251           // Try to fetch the result
3252           response = getProcedureResult(request);
3253         } catch (IOException e) {
3254           serviceEx = unwrapException(e);
3255
3256           // the master may be down
3257           LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
3258
3259           // Not much to do, if we have a DoNotRetryIOException
3260           if (serviceEx instanceof DoNotRetryIOException) {
3261             // TODO: looks like there is no way to unwrap this exception and get the proper
3262             // UnsupportedOperationException aside from looking at the message.
3263             // anyway, if we fail here we just failover to the compatibility side
3264             // and that is always a valid solution.
3265             LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx);
3266             procResultFound = false;
3267             return null;
3268           }
3269         }
3270
3271         // If the procedure is no longer running, we should have a result
3272         if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) {
3273           procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
3274           return convertResult(response);
3275         }
3276
3277         try {
3278           Thread.sleep(getAdmin().getPauseTime(tries++));
3279         } catch (InterruptedException e) {
3280           throw new InterruptedException(
3281             "Interrupted while waiting for the result of proc " + procId);
3282         }
3283       }
3284       if (serviceEx != null) {
3285         throw serviceEx;
3286       } else {
3287         throw new TimeoutException("The procedure " + procId + " is still running");
3288       }
3289     }
3290
3291     private static IOException unwrapException(IOException e) {
3292       if (e instanceof RemoteException) {
3293         return ((RemoteException)e).unwrapRemoteException();
3294       }
3295       return e;
3296     }
3297
3298     protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
3299         throws IOException {
3300       return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
3301           admin.getConnection(), admin.getRpcControllerFactory()) {
3302         @Override
3303         protected GetProcedureResultResponse rpcCall() throws Exception {
3304           return master.getProcedureResult(getRpcController(), request);
3305         }
3306       });
3307     }
3308
3309     /**
3310      * Convert the procedure result response to a specified type.
3311      * @param response the procedure result object to parse
3312      * @return the result data of the procedure.
3313      */
3314     protected V convertResult(final GetProcedureResultResponse response) throws IOException {
3315       if (response.hasException()) {
3316         throw ForeignExceptionUtil.toIOException(response.getException());
3317       }
3318       return null;
3319     }
3320
3321     /**
3322      * Fallback implementation in case the procedure is not supported by the server.
3323      * It should try to wait until the operation is completed.
3324      * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3325      * @return the result data of the operation
3326      */
3327     protected V waitOperationResult(final long deadlineTs)
3328         throws IOException, TimeoutException {
3329       return null;
3330     }
3331
3332     /**
3333      * Called after the operation is completed and the result fetched. this allows to perform extra
3334      * steps after the procedure is completed. it allows to apply transformations to the result that
3335      * will be returned by get().
3336      * @param result the result of the procedure
3337      * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3338      * @return the result of the procedure, which may be the same as the passed one
3339      */
3340     protected V postOperationResult(final V result, final long deadlineTs)
3341         throws IOException, TimeoutException {
3342       return result;
3343     }
3344
3345     /**
3346      * Called after the operation is terminated with a failure.
3347      * this allows to perform extra steps after the procedure is terminated.
3348      * it allows to apply transformations to the result that will be returned by get().
3349      * The default implementation will rethrow the exception
3350      * @param exception the exception got from fetching the result
3351      * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3352      * @return the result of the procedure, which may be the same as the passed one
3353      */
3354     protected V postOperationFailure(final IOException exception, final long deadlineTs)
3355         throws IOException, TimeoutException {
3356       throw exception;
3357     }
3358
3359     protected interface WaitForStateCallable {
3360       boolean checkState(int tries) throws IOException;
3361       void throwInterruptedException() throws InterruptedIOException;
3362       void throwTimeoutException(long elapsed) throws TimeoutException;
3363     }
3364
3365     protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
3366         throws IOException, TimeoutException {
3367       int tries = 0;
3368       IOException serverEx = null;
3369       long startTime = EnvironmentEdgeManager.currentTime();
3370       while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3371         serverEx = null;
3372         try {
3373           if (callable.checkState(tries)) {
3374             return;
3375           }
3376         } catch (IOException e) {
3377           serverEx = e;
3378         }
3379         try {
3380           Thread.sleep(getAdmin().getPauseTime(tries++));
3381         } catch (InterruptedException e) {
3382           callable.throwInterruptedException();
3383         }
3384       }
3385       if (serverEx != null) {
3386         throw unwrapException(serverEx);
3387       } else {
3388         callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
3389       }
3390     }
3391   }
3392
3393   @InterfaceAudience.Private
3394   @InterfaceStability.Evolving
3395   protected static abstract class TableFuture<V> extends ProcedureFuture<V> {
3396     private final TableName tableName;
3397
3398     public TableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
3399       super(admin, procId);
3400       this.tableName = tableName;
3401     }
3402
3403     @Override
3404     public String toString() {
3405       return getDescription();
3406     }
3407
3408     /**
3409      * @return the table name
3410      */
3411     protected TableName getTableName() {
3412       return tableName;
3413     }
3414
3415     /**
3416      * @return the table descriptor
3417      */
3418     protected HTableDescriptor getTableDescriptor() throws IOException {
3419       return getAdmin().getTableDescriptorByTableName(getTableName());
3420     }
3421
3422     /**
3423      * @return the operation type like CREATE, DELETE, DISABLE etc.
3424      */
3425     public abstract String getOperationType();
3426
3427     /**
3428      * @return a description of the operation
3429      */
3430     protected String getDescription() {
3431       return "Operation: " + getOperationType() + ", "
3432           + "Table Name: " + tableName.getNameWithNamespaceInclAsString();
3433
3434     };
3435
3436     protected abstract class TableWaitForStateCallable implements WaitForStateCallable {
3437       @Override
3438       public void throwInterruptedException() throws InterruptedIOException {
3439         throw new InterruptedIOException("Interrupted while waiting for operation: "
3440             + getOperationType() + " on table: " + tableName.getNameWithNamespaceInclAsString());
3441       }
3442
3443       @Override
3444       public void throwTimeoutException(long elapsedTime) throws TimeoutException {
3445         throw new TimeoutException("The operation: " + getOperationType() + " on table: " +
3446             tableName.getNameAsString() + " has not completed after " + elapsedTime + "ms");
3447       }
3448     }
3449
3450     @Override
3451     protected V postOperationResult(final V result, final long deadlineTs)
3452         throws IOException, TimeoutException {
3453       LOG.info(getDescription() + " completed");
3454       return super.postOperationResult(result, deadlineTs);
3455     }
3456
3457     @Override
3458     protected V postOperationFailure(final IOException exception, final long deadlineTs)
3459         throws IOException, TimeoutException {
3460       LOG.info(getDescription() + " failed with " + exception.getMessage());
3461       return super.postOperationFailure(exception, deadlineTs);
3462     }
3463
3464     protected void waitForTableEnabled(final long deadlineTs)
3465         throws IOException, TimeoutException {
3466       waitForState(deadlineTs, new TableWaitForStateCallable() {
3467         @Override
3468         public boolean checkState(int tries) throws IOException {
3469           try {
3470             if (getAdmin().isTableAvailable(tableName)) {
3471               return true;
3472             }
3473           } catch (TableNotFoundException tnfe) {
3474             LOG.debug("Table " + tableName.getNameWithNamespaceInclAsString()
3475                 + " was not enabled, sleeping. tries=" + tries);
3476           }
3477           return false;
3478         }
3479       });
3480     }
3481
3482     protected void waitForTableDisabled(final long deadlineTs)
3483         throws IOException, TimeoutException {
3484       waitForState(deadlineTs, new TableWaitForStateCallable() {
3485         @Override
3486         public boolean checkState(int tries) throws IOException {
3487           return getAdmin().isTableDisabled(tableName);
3488         }
3489       });
3490     }
3491
3492     protected void waitTableNotFound(final long deadlineTs)
3493         throws IOException, TimeoutException {
3494       waitForState(deadlineTs, new TableWaitForStateCallable() {
3495         @Override
3496         public boolean checkState(int tries) throws IOException {
3497           return !getAdmin().tableExists(tableName);
3498         }
3499       });
3500     }
3501
3502     protected void waitForSchemaUpdate(final long deadlineTs)
3503         throws IOException, TimeoutException {
3504       waitForState(deadlineTs, new TableWaitForStateCallable() {
3505         @Override
3506         public boolean checkState(int tries) throws IOException {
3507           return getAdmin().getAlterStatus(tableName).getFirst() == 0;
3508         }
3509       });
3510     }
3511
3512     protected void waitForAllRegionsOnline(final long deadlineTs, final byte[][] splitKeys)
3513         throws IOException, TimeoutException {
3514       final HTableDescriptor desc = getTableDescriptor();
3515       final AtomicInteger actualRegCount = new AtomicInteger(0);
3516       final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3517         @Override
3518         public boolean visit(Result rowResult) throws IOException {
3519           RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
3520           if (list == null) {
3521             LOG.warn("No serialized HRegionInfo in " + rowResult);
3522             return true;
3523           }
3524           HRegionLocation l = list.getRegionLocation();
3525           if (l == null) {
3526             return true;
3527           }
3528           if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
3529             return false;
3530           }
3531           if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
3532           HRegionLocation[] locations = list.getRegionLocations();
3533           for (HRegionLocation location : locations) {
3534             if (location == null) continue;
3535             ServerName serverName = location.getServerName();
3536             // Make sure that regions are assigned to server
3537             if (serverName != null && serverName.getHostAndPort() != null) {
3538               actualRegCount.incrementAndGet();
3539             }
3540           }
3541           return true;
3542         }
3543       };
3544
3545       int tries = 0;
3546       int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
3547       while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3548         actualRegCount.set(0);
3549         MetaTableAccessor.scanMetaForTableRegions(getAdmin().getConnection(), visitor,
3550           desc.getTableName());
3551         if (actualRegCount.get() == numRegs) {
3552           // all the regions are online
3553           return;
3554         }
3555
3556         try {
3557           Thread.sleep(getAdmin().getPauseTime(tries++));
3558         } catch (InterruptedException e) {
3559           throw new InterruptedIOException("Interrupted when opening" + " regions; "
3560               + actualRegCount.get() + " of " + numRegs + " regions processed so far");
3561         }
3562       }
3563       throw new TimeoutException("Only " + actualRegCount.get() + " of " + numRegs
3564           + " regions are online; retries exhausted.");
3565     }
3566   }
3567
3568   @InterfaceAudience.Private
3569   @InterfaceStability.Evolving
3570   protected static abstract class NamespaceFuture extends ProcedureFuture<Void> {
3571     private final String namespaceName;
3572
3573     public NamespaceFuture(final HBaseAdmin admin, final String namespaceName, final Long procId) {
3574       super(admin, procId);
3575       this.namespaceName = namespaceName;
3576     }
3577
3578     /**
3579      * @return the namespace name
3580      */
3581     protected String getNamespaceName() {
3582       return namespaceName;
3583     }
3584
3585     /**
3586      * @return the operation type like CREATE_NAMESPACE, DELETE_NAMESPACE, etc.
3587      */
3588     public abstract String getOperationType();
3589
3590     @Override
3591     public String toString() {
3592       return "Operation: " + getOperationType() + ", Namespace: " + getNamespaceName();
3593     }
3594   }
3595
3596   @Override
3597   public List<SecurityCapability> getSecurityCapabilities() throws IOException {
3598     try {
3599       return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection(),
3600           getRpcControllerFactory()) {
3601         @Override
3602         protected List<SecurityCapability> rpcCall() throws Exception {
3603           SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build();
3604           return ProtobufUtil.toSecurityCapabilityList(
3605             master.getSecurityCapabilities(getRpcController(), req).getCapabilitiesList());
3606         }
3607       });
3608     } catch (IOException e) {
3609       if (e instanceof RemoteException) {
3610         e = ((RemoteException)e).unwrapRemoteException();
3611       }
3612       throw e;
3613     }
3614   }
3615
3616   @Override
3617   public boolean[] setSplitOrMergeEnabled(final boolean enabled, final boolean synchronous,
3618                                           final MasterSwitchType... switchTypes)
3619     throws IOException {
3620     return executeCallable(new MasterCallable<boolean[]>(getConnection(),
3621         getRpcControllerFactory()) {
3622       @Override
3623       protected boolean[] rpcCall() throws Exception {
3624         MasterProtos.SetSplitOrMergeEnabledResponse response =
3625             master.setSplitOrMergeEnabled(getRpcController(),
3626                 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous,
3627                     switchTypes));
3628         boolean[] result = new boolean[switchTypes.length];
3629         int i = 0;
3630         for (Boolean prevValue : response.getPrevValueList()) {
3631           result[i++] = prevValue;
3632         }
3633         return result;
3634       }
3635     });
3636   }
3637
3638   @Override
3639   public boolean isSplitOrMergeEnabled(final MasterSwitchType switchType) throws IOException {
3640     return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3641       @Override
3642       protected Boolean rpcCall() throws Exception {
3643         return master.isSplitOrMergeEnabled(getRpcController(),
3644           RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType)).getEnabled();
3645       }
3646     });
3647   }
3648
3649   private HRegionInfo getMobRegionInfo(TableName tableName) {
3650     return new HRegionInfo(tableName, Bytes.toBytes(".mob"),
3651             HConstants.EMPTY_END_ROW, false, 0);
3652   }
3653
3654   private RpcControllerFactory getRpcControllerFactory() {
3655     return this.rpcControllerFactory;
3656   }
3657 }