View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.protobuf.ByteString;
23  import com.google.protobuf.ServiceException;
24
25  import java.io.Closeable;
26  import java.io.IOException;
27  import java.io.InterruptedIOException;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.HashMap;
31  import java.util.LinkedList;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicReference;
41  import java.util.regex.Pattern;
42
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.hbase.Abortable;
47  import org.apache.hadoop.hbase.ClusterStatus;
48  import org.apache.hadoop.hbase.DoNotRetryIOException;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HColumnDescriptor;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.HRegionLocation;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.MasterNotRunningException;
56  import org.apache.hadoop.hbase.MetaTableAccessor;
57  import org.apache.hadoop.hbase.NamespaceDescriptor;
58  import org.apache.hadoop.hbase.NotServingRegionException;
59  import org.apache.hadoop.hbase.ProcedureInfo;
60  import org.apache.hadoop.hbase.ProcedureUtil;
61  import org.apache.hadoop.hbase.RegionLocations;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableExistsException;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.TableNotDisabledException;
66  import org.apache.hadoop.hbase.TableNotFoundException;
67  import org.apache.hadoop.hbase.UnknownRegionException;
68  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
69  import org.apache.hadoop.hbase.classification.InterfaceAudience;
70  import org.apache.hadoop.hbase.classification.InterfaceStability;
71  import org.apache.hadoop.hbase.client.security.SecurityCapability;
72  import org.apache.hadoop.hbase.exceptions.DeserializationException;
73  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
74  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
75  import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
76  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
77  import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel;
78  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
79  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
80  import org.apache.hadoop.hbase.protobuf.RequestConverter;
81  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
82  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
83  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
84  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
85  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
86  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
87  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
89  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
90  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
92  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
93  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
94  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
95  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
96  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
97  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
98  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
99  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AbortProcedureResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListProceduresRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
163 import org.apache.hadoop.hbase.quotas.QuotaFilter;
164 import org.apache.hadoop.hbase.quotas.QuotaRetriever;
165 import org.apache.hadoop.hbase.quotas.QuotaSettings;
166 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
167 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
168 import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
169 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
170 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
171 import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
172 import org.apache.hadoop.hbase.util.Addressing;
173 import org.apache.hadoop.hbase.util.Bytes;
174 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
175 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
176 import org.apache.hadoop.hbase.util.Pair;
177 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
178 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
179 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
180 import org.apache.hadoop.ipc.RemoteException;
181 import org.apache.hadoop.util.StringUtils;
182 import org.apache.zookeeper.KeeperException;
183
184 /**
185  * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
186  * this is an HBase-internal class as defined in
187  * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html
188  * There are no guarantees for backwards source / binary compatibility and methods or class can
189  * change or go away without deprecation.
190  * Use {@link Connection#getAdmin()} to obtain an instance of {@link Admin} instead of constructing
191  * an HBaseAdmin directly.
192  *
193  * <p>Connection should be an <i>unmanaged</i> connection obtained via
194  * {@link ConnectionFactory#createConnection(Configuration)}
195  *
196  * @see ConnectionFactory
197  * @see Connection
198  * @see Admin
199  */
200 @InterfaceAudience.Private
201 @InterfaceStability.Evolving
202 public class HBaseAdmin implements Admin {
203   private static final Log LOG = LogFactory.getLog(HBaseAdmin.class);
204
205   private static final String ZK_IDENTIFIER_PREFIX =  "hbase-admin-on-";
206
207   private ClusterConnection connection;
208
209   private volatile Configuration conf;
210   private final long pause;
211   private final int numRetries;
212   // Some operations can take a long time such as disable of big table.
213   // numRetries is for 'normal' stuff... Multiply by this factor when
214   // want to wait a long time.
215   private final int retryLongerMultiplier;
216   private final int syncWaitTimeout;
217   private boolean aborted;
218   private int operationTimeout;
219   private int rpcTimeout;
220
221   private RpcRetryingCallerFactory rpcCallerFactory;
222   private RpcControllerFactory rpcControllerFactory;
223
224   private NonceGenerator ng;
225
226   @Override
227   public int getOperationTimeout() {
228     return operationTimeout;
229   }
230
231   HBaseAdmin(ClusterConnection connection) throws IOException {
232     this.conf = connection.getConfiguration();
233     this.connection = connection;
234
235     // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time.
236     this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
237         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
238     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
239         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
240     this.retryLongerMultiplier = this.conf.getInt(
241         "hbase.client.retries.longer.multiplier", 10);
242     this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
243         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
244     this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
245         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
246     this.syncWaitTimeout = this.conf.getInt(
247       "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
248
249     this.rpcCallerFactory = connection.getRpcRetryingCallerFactory();
250     this.rpcControllerFactory = connection.getRpcControllerFactory();
251
252     this.ng = this.connection.getNonceGenerator();
253   }
254
255   @Override
256   public void abort(String why, Throwable e) {
257     // Currently does nothing but throw the passed message and exception
258     this.aborted = true;
259     throw new RuntimeException(why, e);
260   }
261
262   @Override
263   public boolean isAborted(){
264     return this.aborted;
265   }
266
267   @Override
268   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
269   throws IOException {
270     return get(abortProcedureAsync(procId, mayInterruptIfRunning), this.syncWaitTimeout,
271       TimeUnit.MILLISECONDS);
272   }
273
274   @Override
275   public Future<Boolean> abortProcedureAsync(
276       final long procId,
277       final boolean mayInterruptIfRunning) throws IOException {
278     Boolean abortProcResponse = executeCallable(
279       new MasterCallable<AbortProcedureResponse>(getConnection()) {
280         @Override
281         public AbortProcedureResponse call(int callTimeout) throws ServiceException {
282           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
283           controller.setCallTimeout(callTimeout);
284           AbortProcedureRequest abortProcRequest =
285               AbortProcedureRequest.newBuilder().setProcId(procId).build();
286           return master.abortProcedure(controller, abortProcRequest);
287         }
288       }).getIsProcedureAborted();
289
290     AbortProcedureFuture abortProcFuture =
291         new AbortProcedureFuture(this, procId, abortProcResponse);
292     return abortProcFuture;
293   }
294
295   private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
296     private boolean isAbortInProgress;
297
298     public AbortProcedureFuture(
299         final HBaseAdmin admin,
300         final Long procId,
301         final Boolean abortProcResponse) {
302       super(admin, procId);
303       this.isAbortInProgress = abortProcResponse;
304     }
305
306     @Override
307     public Boolean get(long timeout, TimeUnit unit)
308         throws InterruptedException, ExecutionException, TimeoutException {
309       if (!this.isAbortInProgress) {
310         return false;
311       }
312       super.get(timeout, unit);
313       return true;
314     }
315   }
316
317   /** @return Connection used by this object. */
318   @Override
319   public Connection getConnection() {
320     return connection;
321   }
322
323   @Override
324   public boolean tableExists(final TableName tableName) throws IOException {
325     return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
326       @Override
327       public Boolean call(int callTimeout) throws ServiceException, IOException {
328         return MetaTableAccessor.tableExists(connection, tableName);
329       }
330     });
331   }
332
333   @Override
334   public HTableDescriptor[] listTables() throws IOException {
335     return listTables((Pattern)null, false);
336   }
337
338   @Override
339   public HTableDescriptor[] listTables(Pattern pattern) throws IOException {
340     return listTables(pattern, false);
341   }
342
343   @Override
344   public HTableDescriptor[] listTables(String regex) throws IOException {
345     return listTables(Pattern.compile(regex), false);
346   }
347
348   @Override
349   public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables)
350       throws IOException {
351     return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
352       @Override
353       public HTableDescriptor[] call(int callTimeout) throws ServiceException {
354         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
355         controller.setCallTimeout(callTimeout);
356         GetTableDescriptorsRequest req =
357             RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
358         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
359       }
360     });
361   }
362
363   @Override
364   public HTableDescriptor[] listTables(String regex, boolean includeSysTables)
365       throws IOException {
366     return listTables(Pattern.compile(regex), includeSysTables);
367   }
368
369   @Override
370   public TableName[] listTableNames() throws IOException {
371     return listTableNames((Pattern)null, false);
372   }
373
374   @Override
375   public TableName[] listTableNames(Pattern pattern) throws IOException {
376     return listTableNames(pattern, false);
377   }
378
379   @Override
380   public TableName[] listTableNames(String regex) throws IOException {
381     return listTableNames(Pattern.compile(regex), false);
382   }
383
384   @Override
385   public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
386       throws IOException {
387     return executeCallable(new MasterCallable<TableName[]>(getConnection()) {
388       @Override
389       public TableName[] call(int callTimeout) throws ServiceException {
390         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
391         controller.setCallTimeout(callTimeout);
392         GetTableNamesRequest req =
393             RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
394         return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req)
395             .getTableNamesList());
396       }
397     });
398   }
399
400   @Override
401   public TableName[] listTableNames(final String regex, final boolean includeSysTables)
402       throws IOException {
403     return listTableNames(Pattern.compile(regex), includeSysTables);
404   }
405
406   @Override
407   public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
408     return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
409        operationTimeout, rpcTimeout);
410   }
411
412   static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
413       RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
414       int operationTimeout, int rpcTimeout) throws IOException {
415       if (tableName == null) return null;
416       HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
417         @Override
418         public HTableDescriptor call(int callTimeout) throws ServiceException {
419           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
420           controller.setCallTimeout(callTimeout);
421           GetTableDescriptorsResponse htds;
422           GetTableDescriptorsRequest req =
423                   RequestConverter.buildGetTableDescriptorsRequest(tableName);
424           htds = master.getTableDescriptors(controller, req);
425
426           if (!htds.getTableSchemaList().isEmpty()) {
427             return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
428           }
429           return null;
430         }
431       }, rpcCallerFactory, operationTimeout, rpcTimeout);
432       if (htd != null) {
433         return htd;
434       }
435       throw new TableNotFoundException(tableName.getNameAsString());
436   }
437
438   private long getPauseTime(int tries) {
439     int triesCount = tries;
440     if (triesCount >= HConstants.RETRY_BACKOFF.length) {
441       triesCount = HConstants.RETRY_BACKOFF.length - 1;
442     }
443     return this.pause * HConstants.RETRY_BACKOFF[triesCount];
444   }
445
446   @Override
447   public void createTable(HTableDescriptor desc)
448   throws IOException {
449     createTable(desc, null);
450   }
451
452   @Override
453   public void createTable(HTableDescriptor desc, byte [] startKey,
454       byte [] endKey, int numRegions)
455   throws IOException {
456     if(numRegions < 3) {
457       throw new IllegalArgumentException("Must create at least three regions");
458     } else if(Bytes.compareTo(startKey, endKey) >= 0) {
459       throw new IllegalArgumentException("Start key must be smaller than end key");
460     }
461     if (numRegions == 3) {
462       createTable(desc, new byte[][]{startKey, endKey});
463       return;
464     }
465     byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
466     if(splitKeys == null || splitKeys.length != numRegions - 1) {
467       throw new IllegalArgumentException("Unable to split key range into enough regions");
468     }
469     createTable(desc, splitKeys);
470   }
471
472   @Override
473   public void createTable(final HTableDescriptor desc, byte [][] splitKeys)
474       throws IOException {
475     get(createTableAsync(desc, splitKeys), syncWaitTimeout, TimeUnit.MILLISECONDS);
476   }
477
478   @Override
479   public Future<Void> createTableAsync(final HTableDescriptor desc, final byte[][] splitKeys)
480       throws IOException {
481     if (desc.getTableName() == null) {
482       throw new IllegalArgumentException("TableName cannot be null");
483     }
484     if (splitKeys != null && splitKeys.length > 0) {
485       Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
486       // Verify there are no duplicate split keys
487       byte[] lastKey = null;
488       for (byte[] splitKey : splitKeys) {
489         if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
490           throw new IllegalArgumentException(
491               "Empty split key must not be passed in the split keys.");
492         }
493         if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
494           throw new IllegalArgumentException("All split keys must be unique, " +
495             "found duplicate: " + Bytes.toStringBinary(splitKey) +
496             ", " + Bytes.toStringBinary(lastKey));
497         }
498         lastKey = splitKey;
499       }
500     }
501
502     CreateTableResponse response = executeCallable(
503       new MasterCallable<CreateTableResponse>(getConnection()) {
504         @Override
505         public CreateTableResponse call(int callTimeout) throws ServiceException {
506           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
507           controller.setCallTimeout(callTimeout);
508           controller.setPriority(desc.getTableName());
509           CreateTableRequest request = RequestConverter.buildCreateTableRequest(
510             desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
511           return master.createTable(controller, request);
512         }
513       });
514     return new CreateTableFuture(this, desc, splitKeys, response);
515   }
516
517   private static class CreateTableFuture extends TableFuture<Void> {
518     private final HTableDescriptor desc;
519     private final byte[][] splitKeys;
520
521     public CreateTableFuture(final HBaseAdmin admin, final HTableDescriptor desc,
522         final byte[][] splitKeys, final CreateTableResponse response) {
523       super(admin, desc.getTableName(),
524               (response != null && response.hasProcId()) ? response.getProcId() : null);
525       this.splitKeys = splitKeys;
526       this.desc = desc;
527     }
528
529     @Override
530     protected HTableDescriptor getTableDescriptor() {
531       return desc;
532     }
533
534     @Override
535     public String getOperationType() {
536       return "CREATE";
537     }
538
539     @Override
540     protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
541       waitForTableEnabled(deadlineTs);
542       waitForAllRegionsOnline(deadlineTs, splitKeys);
543       return null;
544     }
545   }
546
547   @Override
548   public void deleteTable(final TableName tableName) throws IOException {
549     get(deleteTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
550   }
551
552   @Override
553   public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
554     DeleteTableResponse response = executeCallable(
555       new MasterCallable<DeleteTableResponse>(getConnection()) {
556         @Override
557         public DeleteTableResponse call(int callTimeout) throws ServiceException {
558           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
559           controller.setCallTimeout(callTimeout);
560           controller.setPriority(tableName);
561           DeleteTableRequest req =
562               RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
563           return master.deleteTable(controller,req);
564         }
565       });
566     return new DeleteTableFuture(this, tableName, response);
567   }
568
569   private static class DeleteTableFuture extends TableFuture<Void> {
570     public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
571         final DeleteTableResponse response) {
572       super(admin, tableName,
573               (response != null && response.hasProcId()) ? response.getProcId() : null);
574     }
575
576     @Override
577     public String getOperationType() {
578       return "DELETE";
579     }
580
581     @Override
582     protected Void waitOperationResult(final long deadlineTs)
583         throws IOException, TimeoutException {
584       waitTableNotFound(deadlineTs);
585       return null;
586     }
587
588     @Override
589     protected Void postOperationResult(final Void result, final long deadlineTs)
590         throws IOException, TimeoutException {
591       // Delete cached information to prevent clients from using old locations
592       ((ClusterConnection) getAdmin().getConnection()).clearRegionCache(getTableName());
593       return super.postOperationResult(result, deadlineTs);
594     }
595   }
596
597   @Override
598   public HTableDescriptor[] deleteTables(String regex) throws IOException {
599     return deleteTables(Pattern.compile(regex));
600   }
601
602   /**
603    * Delete tables matching the passed in pattern and wait on completion.
604    *
605    * Warning: Use this method carefully, there is no prompting and the effect is
606    * immediate. Consider using {@link #listTables(java.util.regex.Pattern) } and
607    * {@link #deleteTable(TableName)}
608    *
609    * @param pattern The pattern to match table names against
610    * @return Table descriptors for tables that couldn't be deleted
611    * @throws IOException
612    */
613   @Override
614   public HTableDescriptor[] deleteTables(Pattern pattern) throws IOException {
615     List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
616     for (HTableDescriptor table : listTables(pattern)) {
617       try {
618         deleteTable(table.getTableName());
619       } catch (IOException ex) {
620         LOG.info("Failed to delete table " + table.getTableName(), ex);
621         failed.add(table);
622       }
623     }
624     return failed.toArray(new HTableDescriptor[failed.size()]);
625   }
626
627   @Override
628   public void truncateTable(final TableName tableName, final boolean preserveSplits)
629       throws IOException {
630     get(truncateTableAsync(tableName, preserveSplits), syncWaitTimeout, TimeUnit.MILLISECONDS);
631   }
632
633   @Override
634   public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
635       throws IOException {
636     TruncateTableResponse response =
637         executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) {
638           @Override
639           public TruncateTableResponse call(int callTimeout) throws ServiceException {
640             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
641             controller.setCallTimeout(callTimeout);
642             controller.setPriority(tableName);
643             LOG.info("Started truncating " + tableName);
644             TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
645               tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
646             return master.truncateTable(controller, req);
647           }
648         });
649     return new TruncateTableFuture(this, tableName, preserveSplits, response);
650   }
651
652   private static class TruncateTableFuture extends TableFuture<Void> {
653     private final boolean preserveSplits;
654
655     public TruncateTableFuture(final HBaseAdmin admin, final TableName tableName,
656         final boolean preserveSplits, final TruncateTableResponse response) {
657       super(admin, tableName,
658              (response != null && response.hasProcId()) ? response.getProcId() : null);
659       this.preserveSplits = preserveSplits;
660     }
661
662     @Override
663     public String getOperationType() {
664       return "TRUNCATE";
665     }
666
667     @Override
668     protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
669       waitForTableEnabled(deadlineTs);
670       // once the table is enabled, we know the operation is done. so we can fetch the splitKeys
671       byte[][] splitKeys = preserveSplits ? getAdmin().getTableSplits(getTableName()) : null;
672       waitForAllRegionsOnline(deadlineTs, splitKeys);
673       return null;
674     }
675   }
676
677   private byte[][] getTableSplits(final TableName tableName) throws IOException {
678     byte[][] splits = null;
679     try (RegionLocator locator = getConnection().getRegionLocator(tableName)) {
680       byte[][] startKeys = locator.getStartKeys();
681       if (startKeys.length == 1) {
682         return splits;
683       }
684       splits = new byte[startKeys.length - 1][];
685       for (int i = 1; i < startKeys.length; i++) {
686         splits[i - 1] = startKeys[i];
687       }
688     }
689     return splits;
690   }
691
692   @Override
693   public void enableTable(final TableName tableName)
694   throws IOException {
695     get(enableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
696   }
697
698   @Override
699   public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
700     TableName.isLegalFullyQualifiedTableName(tableName.getName());
701     EnableTableResponse response = executeCallable(
702       new MasterCallable<EnableTableResponse>(getConnection()) {
703         @Override
704         public EnableTableResponse call(int callTimeout) throws ServiceException {
705           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
706           controller.setCallTimeout(callTimeout);
707           controller.setPriority(tableName);
708
709           LOG.info("Started enable of " + tableName);
710           EnableTableRequest req =
711               RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
712           return master.enableTable(controller,req);
713         }
714       });
715     return new EnableTableFuture(this, tableName, response);
716   }
717
718   private static class EnableTableFuture extends TableFuture<Void> {
719     public EnableTableFuture(final HBaseAdmin admin, final TableName tableName,
720         final EnableTableResponse response) {
721       super(admin, tableName,
722               (response != null && response.hasProcId()) ? response.getProcId() : null);
723     }
724
725     @Override
726     public String getOperationType() {
727       return "ENABLE";
728     }
729
730     @Override
731     protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
732       waitForTableEnabled(deadlineTs);
733       return null;
734     }
735   }
736
737   @Override
738   public HTableDescriptor[] enableTables(String regex) throws IOException {
739     return enableTables(Pattern.compile(regex));
740   }
741
742   @Override
743   public HTableDescriptor[] enableTables(Pattern pattern) throws IOException {
744     List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
745     for (HTableDescriptor table : listTables(pattern)) {
746       if (isTableDisabled(table.getTableName())) {
747         try {
748           enableTable(table.getTableName());
749         } catch (IOException ex) {
750           LOG.info("Failed to enable table " + table.getTableName(), ex);
751           failed.add(table);
752         }
753       }
754     }
755     return failed.toArray(new HTableDescriptor[failed.size()]);
756   }
757
758   @Override
759   public void disableTable(final TableName tableName)
760   throws IOException {
761     get(disableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
762   }
763
764   @Override
765   public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
766     TableName.isLegalFullyQualifiedTableName(tableName.getName());
767     DisableTableResponse response = executeCallable(
768       new MasterCallable<DisableTableResponse>(getConnection()) {
769         @Override
770         public DisableTableResponse call(int callTimeout) throws ServiceException {
771           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
772           controller.setCallTimeout(callTimeout);
773           controller.setPriority(tableName);
774
775           LOG.info("Started disable of " + tableName);
776           DisableTableRequest req =
777               RequestConverter.buildDisableTableRequest(
778                 tableName, ng.getNonceGroup(), ng.newNonce());
779           return master.disableTable(controller, req);
780         }
781       });
782     return new DisableTableFuture(this, tableName, response);
783   }
784
785   private static class DisableTableFuture extends TableFuture<Void> {
786     public DisableTableFuture(final HBaseAdmin admin, final TableName tableName,
787         final DisableTableResponse response) {
788       super(admin, tableName,
789               (response != null && response.hasProcId()) ? response.getProcId() : null);
790     }
791
792     @Override
793     public String getOperationType() {
794       return "DISABLE";
795     }
796
797     @Override
798     protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException {
799       waitForTableDisabled(deadlineTs);
800       return null;
801     }
802   }
803
804   @Override
805   public HTableDescriptor[] disableTables(String regex) throws IOException {
806     return disableTables(Pattern.compile(regex));
807   }
808
809   @Override
810   public HTableDescriptor[] disableTables(Pattern pattern) throws IOException {
811     List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
812     for (HTableDescriptor table : listTables(pattern)) {
813       if (isTableEnabled(table.getTableName())) {
814         try {
815           disableTable(table.getTableName());
816         } catch (IOException ex) {
817           LOG.info("Failed to disable table " + table.getTableName(), ex);
818           failed.add(table);
819         }
820       }
821     }
822     return failed.toArray(new HTableDescriptor[failed.size()]);
823   }
824
825   @Override
826   public boolean isTableEnabled(final TableName tableName) throws IOException {
827     checkTableExists(tableName);
828     return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
829       @Override
830       public Boolean call(int callTimeout) throws ServiceException, IOException {
831         TableState tableState = MetaTableAccessor.getTableState(connection, tableName);
832         if (tableState == null)
833           throw new TableNotFoundException(tableName);
834         return tableState.inStates(TableState.State.ENABLED);
835       }
836     });
837   }
838
839   @Override
840   public boolean isTableDisabled(TableName tableName) throws IOException {
841     checkTableExists(tableName);
842     return connection.isTableDisabled(tableName);
843   }
844
845   @Override
846   public boolean isTableAvailable(TableName tableName) throws IOException {
847     return connection.isTableAvailable(tableName, null);
848   }
849
850   @Override
851   public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
852     return connection.isTableAvailable(tableName, splitKeys);
853   }
854
855   @Override
856   public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException {
857     return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
858       @Override
859       public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
860         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
861         controller.setCallTimeout(callTimeout);
862         controller.setPriority(tableName);
863
864         GetSchemaAlterStatusRequest req = RequestConverter
865             .buildGetSchemaAlterStatusRequest(tableName);
866         GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req);
867         Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
868             ret.getTotalRegions());
869         return pair;
870       }
871     });
872   }
873
874   @Override
875   public Pair<Integer, Integer> getAlterStatus(final byte[] tableName) throws IOException {
876     return getAlterStatus(TableName.valueOf(tableName));
877   }
878
879   /**
880    * {@inheritDoc}
881    * @deprecated Since 2.0. Will be removed in 3.0. Use
882    *     {@link #addColumnFamily(TableName, HColumnDescriptor)} instead.
883    */
884   @Override
885   @Deprecated
886   public void addColumn(final TableName tableName, final HColumnDescriptor columnFamily)
887   throws IOException {
888     addColumnFamily(tableName, columnFamily);
889   }
890
891   @Override
892   public Future<Void> addColumnFamily(final TableName tableName,
893       final HColumnDescriptor columnFamily) throws IOException {
894     AddColumnResponse response =
895         executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) {
896           @Override
897           public AddColumnResponse call(int callTimeout) throws ServiceException {
898             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
899             controller.setCallTimeout(callTimeout);
900             controller.setPriority(tableName);
901
902             AddColumnRequest req =
903                 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
904                   ng.newNonce());
905             return master.addColumn(controller, req);
906           }
907         });
908     return new AddColumnFamilyFuture(this, tableName, response);
909   }
910
911   private static class AddColumnFamilyFuture extends ModifyTableFuture {
912     public AddColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
913         final AddColumnResponse response) {
914       super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
915           : null);
916     }
917
918     @Override
919     public String getOperationType() {
920       return "ADD_COLUMN_FAMILY";
921     }
922   }
923
924   /**
925    * {@inheritDoc}
926    * @deprecated Since 2.0. Will be removed in 3.0. Use
927    *     {@link #deleteColumnFamily(TableName, byte[])} instead.
928    */
929   @Override
930   @Deprecated
931   public void deleteColumn(final TableName tableName, final byte[] columnFamily)
932   throws IOException {
933     deleteColumnFamily(tableName, columnFamily);
934   }
935
936   @Override
937   public Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
938       throws IOException {
939     DeleteColumnResponse response =
940         executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection()) {
941           @Override
942           public DeleteColumnResponse call(int callTimeout) throws ServiceException {
943             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
944             controller.setCallTimeout(callTimeout);
945             controller.setPriority(tableName);
946
947             DeleteColumnRequest req =
948                 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
949                   ng.getNonceGroup(), ng.newNonce());
950             master.deleteColumn(controller, req);
951             return null;
952           }
953         });
954     return new DeleteColumnFamilyFuture(this, tableName, response);
955   }
956
957   private static class DeleteColumnFamilyFuture extends ModifyTableFuture {
958     public DeleteColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
959         final DeleteColumnResponse response) {
960       super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
961           : null);
962     }
963
964     @Override
965     public String getOperationType() {
966       return "DELETE_COLUMN_FAMILY";
967     }
968   }
969
970   /**
971    * {@inheritDoc}
972    * @deprecated As of 2.0. Will be removed in 3.0. Use
973    *     {@link #modifyColumnFamily(TableName, HColumnDescriptor)} instead.
974    */
975   @Override
976   @Deprecated
977   public void modifyColumn(final TableName tableName, final HColumnDescriptor columnFamily)
978   throws IOException {
979     modifyColumnFamily(tableName, columnFamily);
980   }
981
982   @Override
983   public Future<Void> modifyColumnFamily(final TableName tableName,
984       final HColumnDescriptor columnFamily) throws IOException {
985     ModifyColumnResponse response =
986         executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection()) {
987           @Override
988           public ModifyColumnResponse call(int callTimeout) throws ServiceException {
989             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
990             controller.setCallTimeout(callTimeout);
991             controller.setPriority(tableName);
992
993             ModifyColumnRequest req =
994                 RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
995                   ng.getNonceGroup(), ng.newNonce());
996             master.modifyColumn(controller, req);
997             return null;
998           }
999         });
1000     return new ModifyColumnFamilyFuture(this, tableName, response);
1001   }
1002
1003   private static class ModifyColumnFamilyFuture extends ModifyTableFuture {
1004     public ModifyColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
1005         final ModifyColumnResponse response) {
1006       super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
1007           : null);
1008     }
1009
1010     @Override
1011     public String getOperationType() {
1012       return "MODIFY_COLUMN_FAMILY";
1013     }
1014   }
1015
1016   @Override
1017   public void closeRegion(final String regionname, final String serverName) throws IOException {
1018     closeRegion(Bytes.toBytes(regionname), serverName);
1019   }
1020
1021   @Override
1022   public void closeRegion(final byte [] regionname, final String serverName) throws IOException {
1023     if (serverName != null) {
1024       Pair<HRegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionname);
1025       if (pair == null || pair.getFirst() == null) {
1026         throw new UnknownRegionException(Bytes.toStringBinary(regionname));
1027       } else {
1028         closeRegion(ServerName.valueOf(serverName), pair.getFirst());
1029       }
1030     } else {
1031       Pair<HRegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionname);
1032       if (pair == null) {
1033         throw new UnknownRegionException(Bytes.toStringBinary(regionname));
1034       } else if (pair.getSecond() == null) {
1035         throw new NoServerForRegionException(Bytes.toStringBinary(regionname));
1036       } else {
1037         closeRegion(pair.getSecond(), pair.getFirst());
1038       }
1039     }
1040   }
1041
1042   @Override
1043   public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
1044       final String serverName) throws IOException {
1045     if (null == serverName || ("").equals(serverName.trim())) {
1046       throw new IllegalArgumentException(
1047           "The servername cannot be null or empty.");
1048     }
1049     ServerName sn = ServerName.valueOf(serverName);
1050     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1051     // Close the region without updating zk state.
1052     CloseRegionRequest request =
1053       RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
1054     try {
1055       PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1056
1057       // TODO: this does not do retries, it should. Set priority and timeout in controller
1058       CloseRegionResponse response = admin.closeRegion(controller, request);
1059       boolean isRegionClosed = response.getClosed();
1060       if (false == isRegionClosed) {
1061         LOG.error("Not able to close the region " + encodedRegionName + ".");
1062       }
1063       return isRegionClosed;
1064     } catch (ServiceException se) {
1065       throw ProtobufUtil.getRemoteException(se);
1066     }
1067   }
1068
1069   @Override
1070   public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException {
1071     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1072     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1073
1074     // Close the region without updating zk state.
1075     ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName());
1076   }
1077
1078   @Override
1079   public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
1080     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1081     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1082     return ProtobufUtil.getOnlineRegions(controller, admin);
1083   }
1084
1085   @Override
1086   public void flush(final TableName tableName) throws IOException {
1087     checkTableExists(tableName);
1088     if (isTableDisabled(tableName)) {
1089       LOG.info("Table is disabled: " + tableName.getNameAsString());
1090       return;
1091     }
1092     execProcedure("flush-table-proc", tableName.getNameAsString(),
1093       new HashMap<String, String>());
1094   }
1095
1096   @Override
1097   public void flushRegion(final byte[] regionName) throws IOException {
1098     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
1099     if (regionServerPair == null) {
1100       throw new IllegalArgumentException("Unknown regionname: " + Bytes.toStringBinary(regionName));
1101     }
1102     if (regionServerPair.getSecond() == null) {
1103       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1104     }
1105     HRegionInfo hRegionInfo = regionServerPair.getFirst();
1106     ServerName serverName = regionServerPair.getSecond();
1107
1108     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1109
1110     AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
1111     FlushRegionRequest request =
1112         RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
1113     try {
1114       // TODO: this does not do retries, it should. Set priority and timeout in controller
1115       admin.flushRegion(controller, request);
1116     } catch (ServiceException se) {
1117       throw ProtobufUtil.getRemoteException(se);
1118     }
1119   }
1120
1121   /**
1122    * {@inheritDoc}
1123    */
1124   @Override
1125   public void compact(final TableName tableName)
1126     throws IOException {
1127     compact(tableName, null, false, CompactType.NORMAL);
1128   }
1129
1130   @Override
1131   public void compactRegion(final byte[] regionName)
1132     throws IOException {
1133     compactRegion(regionName, null, false);
1134   }
1135
1136   /**
1137    * {@inheritDoc}
1138    */
1139   @Override
1140   public void compact(final TableName tableName, final byte[] columnFamily)
1141     throws IOException {
1142     compact(tableName, columnFamily, false, CompactType.NORMAL);
1143   }
1144
1145   /**
1146    * {@inheritDoc}
1147    */
1148   @Override
1149   public void compactRegion(final byte[] regionName, final byte[] columnFamily)
1150     throws IOException {
1151     compactRegion(regionName, columnFamily, false);
1152   }
1153
1154   /**
1155    * {@inheritDoc}
1156    */
1157   @Override
1158   public void compactRegionServer(final ServerName sn, boolean major)
1159   throws IOException, InterruptedException {
1160     for (HRegionInfo region : getOnlineRegions(sn)) {
1161       compact(sn, region, major, null);
1162     }
1163   }
1164
1165   @Override
1166   public void majorCompact(final TableName tableName)
1167   throws IOException {
1168     compact(tableName, null, true, CompactType.NORMAL);
1169   }
1170
1171   @Override
1172   public void majorCompactRegion(final byte[] regionName)
1173   throws IOException {
1174     compactRegion(regionName, null, true);
1175   }
1176
1177   /**
1178    * {@inheritDoc}
1179    */
1180   @Override
1181   public void majorCompact(final TableName tableName, final byte[] columnFamily)
1182   throws IOException {
1183     compact(tableName, columnFamily, true, CompactType.NORMAL);
1184   }
1185
1186   @Override
1187   public void majorCompactRegion(final byte[] regionName, final byte[] columnFamily)
1188   throws IOException {
1189     compactRegion(regionName, columnFamily, true);
1190   }
1191
1192   /**
1193    * Compact a table.
1194    * Asynchronous operation.
1195    *
1196    * @param tableName table or region to compact
1197    * @param columnFamily column family within a table or region
1198    * @param major True if we are to do a major compaction.
1199    * @throws IOException if a remote or network exception occurs
1200    * @throws InterruptedException
1201    */
1202   private void compact(final TableName tableName, final byte[] columnFamily,final boolean major,
1203                        CompactType compactType) throws IOException {
1204     switch (compactType) {
1205       case MOB:
1206         ServerName master = getMasterAddress();
1207         compact(master, getMobRegionInfo(tableName), major, columnFamily);
1208         break;
1209       case NORMAL:
1210       default:
1211         ZooKeeperWatcher zookeeper = null;
1212         try {
1213           checkTableExists(tableName);
1214           zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
1215                   new ThrowableAbortable());
1216           List<Pair<HRegionInfo, ServerName>> pairs;
1217           if (TableName.META_TABLE_NAME.equals(tableName)) {
1218             pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
1219           } else {
1220             pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
1221           }
1222           for (Pair<HRegionInfo, ServerName> pair: pairs) {
1223             if (pair.getFirst().isOffline()) continue;
1224             if (pair.getSecond() == null) continue;
1225             try {
1226               compact(pair.getSecond(), pair.getFirst(), major, columnFamily);
1227             } catch (NotServingRegionException e) {
1228               if (LOG.isDebugEnabled()) {
1229                 LOG.debug("Trying to" + (major ? " major" : "") + " compact " +
1230                         pair.getFirst() + ": " +
1231                         StringUtils.stringifyException(e));
1232               }
1233             }
1234           }
1235         } finally {
1236           if (zookeeper != null) {
1237             zookeeper.close();
1238           }
1239         }
1240         break;
1241     }
1242   }
1243
1244   /**
1245    * Compact an individual region.
1246    * Asynchronous operation.
1247    *
1248    * @param regionName region to compact
1249    * @param columnFamily column family within a table or region
1250    * @param major True if we are to do a major compaction.
1251    * @throws IOException if a remote or network exception occurs
1252    * @throws InterruptedException
1253    */
1254   private void compactRegion(final byte[] regionName, final byte[] columnFamily,final boolean major)
1255   throws IOException {
1256     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
1257     if (regionServerPair == null) {
1258       throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1259     }
1260     if (regionServerPair.getSecond() == null) {
1261       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1262     }
1263     compact(regionServerPair.getSecond(), regionServerPair.getFirst(), major, columnFamily);
1264   }
1265
1266   private void compact(final ServerName sn, final HRegionInfo hri,
1267       final boolean major, final byte [] family)
1268   throws IOException {
1269     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1270     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1271     CompactRegionRequest request =
1272       RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
1273     try {
1274       // TODO: this does not do retries, it should. Set priority and timeout in controller
1275       admin.compactRegion(controller, request);
1276     } catch (ServiceException se) {
1277       throw ProtobufUtil.getRemoteException(se);
1278     }
1279   }
1280
1281   @Override
1282   public void move(final byte [] encodedRegionName, final byte [] destServerName)
1283       throws IOException {
1284
1285     executeCallable(new MasterCallable<Void>(getConnection()) {
1286       @Override
1287       public Void call(int callTimeout) throws ServiceException {
1288         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1289         controller.setCallTimeout(callTimeout);
1290         // Hard to know the table name, at least check if meta
1291         if (isMetaRegion(encodedRegionName)) {
1292           controller.setPriority(TableName.META_TABLE_NAME);
1293         }
1294
1295         try {
1296           MoveRegionRequest request =
1297               RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
1298             master.moveRegion(controller, request);
1299         } catch (DeserializationException de) {
1300           LOG.error("Could not parse destination server name: " + de);
1301           throw new ServiceException(new DoNotRetryIOException(de));
1302         }
1303         return null;
1304       }
1305     });
1306   }
1307
1308   private boolean isMetaRegion(final byte[] regionName) {
1309     return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
1310         || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
1311   }
1312
1313   @Override
1314   public void assign(final byte[] regionName) throws MasterNotRunningException,
1315       ZooKeeperConnectionException, IOException {
1316     final byte[] toBeAssigned = getRegionName(regionName);
1317     executeCallable(new MasterCallable<Void>(getConnection()) {
1318       @Override
1319       public Void call(int callTimeout) throws ServiceException {
1320         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1321         controller.setCallTimeout(callTimeout);
1322         // Hard to know the table name, at least check if meta
1323         if (isMetaRegion(regionName)) {
1324           controller.setPriority(TableName.META_TABLE_NAME);
1325         }
1326
1327         AssignRegionRequest request =
1328           RequestConverter.buildAssignRegionRequest(toBeAssigned);
1329         master.assignRegion(controller,request);
1330         return null;
1331       }
1332     });
1333   }
1334
1335   @Override
1336   public void unassign(final byte [] regionName, final boolean force)
1337   throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
1338     final byte[] toBeUnassigned = getRegionName(regionName);
1339     executeCallable(new MasterCallable<Void>(getConnection()) {
1340       @Override
1341       public Void call(int callTimeout) throws ServiceException {
1342         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1343         controller.setCallTimeout(callTimeout);
1344         // Hard to know the table name, at least check if meta
1345         if (isMetaRegion(regionName)) {
1346           controller.setPriority(TableName.META_TABLE_NAME);
1347         }
1348         UnassignRegionRequest request =
1349           RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
1350         master.unassignRegion(controller, request);
1351         return null;
1352       }
1353     });
1354   }
1355
1356   @Override
1357   public void offline(final byte [] regionName)
1358   throws IOException {
1359     executeCallable(new MasterCallable<Void>(getConnection()) {
1360       @Override
1361       public Void call(int callTimeout) throws ServiceException {
1362         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1363         controller.setCallTimeout(callTimeout);
1364         // Hard to know the table name, at least check if meta
1365         if (isMetaRegion(regionName)) {
1366           controller.setPriority(TableName.META_TABLE_NAME);
1367         }
1368         master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName));
1369         return null;
1370       }
1371     });
1372   }
1373
1374   @Override
1375   public boolean setBalancerRunning(final boolean on, final boolean synchronous)
1376   throws IOException {
1377     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1378       @Override
1379       public Boolean call(int callTimeout) throws ServiceException {
1380         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1381         controller.setCallTimeout(callTimeout);
1382
1383         SetBalancerRunningRequest req =
1384             RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
1385         return master.setBalancerRunning(controller, req).getPrevBalanceValue();
1386       }
1387     });
1388   }
1389
1390   @Override
1391   public boolean balancer() throws IOException {
1392     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1393       @Override
1394       public Boolean call(int callTimeout) throws ServiceException {
1395         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1396         controller.setCallTimeout(callTimeout);
1397
1398         return master.balance(controller,
1399           RequestConverter.buildBalanceRequest(false)).getBalancerRan();
1400       }
1401     });
1402   }
1403
1404   @Override
1405   public boolean balancer(final boolean force) throws IOException {
1406     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1407       @Override
1408       public Boolean call(int callTimeout) throws ServiceException {
1409         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1410         controller.setCallTimeout(callTimeout);
1411
1412         return master.balance(controller,
1413           RequestConverter.buildBalanceRequest(force)).getBalancerRan();
1414       }
1415     });
1416   }
1417
1418   @Override
1419   public boolean isBalancerEnabled() throws IOException {
1420     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1421       @Override
1422       public Boolean call(int callTimeout) throws ServiceException {
1423         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1424         controller.setCallTimeout(callTimeout);
1425
1426         return master.isBalancerEnabled(controller,
1427           RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
1428       }
1429     });
1430   }
1431
1432   @Override
1433   public boolean normalize() throws IOException {
1434     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1435       @Override
1436       public Boolean call(int callTimeout) throws ServiceException {
1437         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1438         controller.setCallTimeout(callTimeout);
1439
1440         return master.normalize(controller,
1441           RequestConverter.buildNormalizeRequest()).getNormalizerRan();
1442       }
1443     });
1444   }
1445
1446   @Override
1447   public boolean isNormalizerEnabled() throws IOException {
1448     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1449       @Override
1450       public Boolean call(int callTimeout) throws ServiceException {
1451         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1452         controller.setCallTimeout(callTimeout);
1453
1454         return master.isNormalizerEnabled(controller,
1455           RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
1456       }
1457     });
1458   }
1459
1460   @Override
1461   public boolean setNormalizerRunning(final boolean on) throws IOException {
1462     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1463       @Override
1464       public Boolean call(int callTimeout) throws ServiceException {
1465         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1466         controller.setCallTimeout(callTimeout);
1467
1468         SetNormalizerRunningRequest req =
1469           RequestConverter.buildSetNormalizerRunningRequest(on);
1470         return master.setNormalizerRunning(controller, req).getPrevNormalizerValue();
1471       }
1472     });
1473   }
1474
1475   @Override
1476   public boolean enableCatalogJanitor(final boolean enable) throws IOException {
1477     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1478       @Override
1479       public Boolean call(int callTimeout) throws ServiceException {
1480         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1481         controller.setCallTimeout(callTimeout);
1482
1483         return master.enableCatalogJanitor(controller,
1484           RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
1485       }
1486     });
1487   }
1488
1489   @Override
1490   public int runCatalogScan() throws IOException {
1491     return executeCallable(new MasterCallable<Integer>(getConnection()) {
1492       @Override
1493       public Integer call(int callTimeout) throws ServiceException {
1494         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1495         controller.setCallTimeout(callTimeout);
1496
1497         return master.runCatalogScan(controller,
1498           RequestConverter.buildCatalogScanRequest()).getScanResult();
1499       }
1500     });
1501   }
1502
1503   @Override
1504   public boolean isCatalogJanitorEnabled() throws IOException {
1505     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
1506       @Override
1507       public Boolean call(int callTimeout) throws ServiceException {
1508         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1509         controller.setCallTimeout(callTimeout);
1510
1511         return master.isCatalogJanitorEnabled(controller,
1512           RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
1513       }
1514     });
1515   }
1516
1517   private boolean isEncodedRegionName(byte[] regionName) throws IOException {
1518     try {
1519       HRegionInfo.parseRegionName(regionName);
1520       return false;
1521     } catch (IOException e) {
1522       if (StringUtils.stringifyException(e)
1523         .contains(HRegionInfo.INVALID_REGION_NAME_FORMAT_MESSAGE)) {
1524         return true;
1525       }
1526       throw e;
1527     }
1528   }
1529
1530   /**
1531    * Merge two regions. Synchronous operation.
1532    * Note: It is not feasible to predict the length of merge.
1533    *   Therefore, this is for internal testing only.
1534    * @param nameOfRegionA encoded or full name of region a
1535    * @param nameOfRegionB encoded or full name of region b
1536    * @param forcible true if do a compulsory merge, otherwise we will only merge
1537    *          two adjacent regions
1538    * @throws IOException
1539    */
1540   @VisibleForTesting
1541   public void mergeRegionsSync(
1542       final byte[] nameOfRegionA,
1543       final byte[] nameOfRegionB,
1544       final boolean forcible) throws IOException {
1545     get(
1546       mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible),
1547       syncWaitTimeout,
1548       TimeUnit.MILLISECONDS);
1549   }
1550
1551   /**
1552    * Merge two regions. Asynchronous operation.
1553    * @param nameOfRegionA encoded or full name of region a
1554    * @param nameOfRegionB encoded or full name of region b
1555    * @param forcible true if do a compulsory merge, otherwise we will only merge
1556    *          two adjacent regions
1557    * @throws IOException
1558    * @deprecated Since 2.0. Will be removed in 3.0. Use
1559    *     {@link #mergeRegionsAsync(byte[], byte[], boolean)} instead.
1560    */
1561   @Deprecated
1562   @Override
1563   public void mergeRegions(final byte[] nameOfRegionA,
1564       final byte[] nameOfRegionB, final boolean forcible)
1565       throws IOException {
1566     mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible);
1567   }
1568
1569   /**
1570    * Merge two regions. Asynchronous operation.
1571    * @param nameOfRegionA encoded or full name of region a
1572    * @param nameOfRegionB encoded or full name of region b
1573    * @param forcible true if do a compulsory merge, otherwise we will only merge
1574    *          two adjacent regions
1575    * @throws IOException
1576    */
1577   @Override
1578   public Future<Void> mergeRegionsAsync(
1579       final byte[] nameOfRegionA,
1580       final byte[] nameOfRegionB,
1581       final boolean forcible) throws IOException {
1582
1583     final byte[] encodedNameOfRegionA = isEncodedRegionName(nameOfRegionA) ?
1584       nameOfRegionA : HRegionInfo.encodeRegionName(nameOfRegionA).getBytes();
1585     final byte[] encodedNameOfRegionB = isEncodedRegionName(nameOfRegionB) ?
1586       nameOfRegionB : HRegionInfo.encodeRegionName(nameOfRegionB).getBytes();
1587
1588     TableName tableName;
1589     Pair<HRegionInfo, ServerName> pair = getRegion(nameOfRegionA);
1590
1591     if (pair != null) {
1592       if (pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1593         throw new IllegalArgumentException ("Can't invoke merge on non-default regions directly");
1594       }
1595       tableName = pair.getFirst().getTable();
1596     } else {
1597       throw new UnknownRegionException (
1598         "Can't invoke merge on unknown region " + Bytes.toStringBinary(encodedNameOfRegionA));
1599     }
1600
1601     pair = getRegion(nameOfRegionB);
1602     if (pair != null) {
1603       if (pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1604         throw new IllegalArgumentException ("Can't invoke merge on non-default regions directly");
1605       }
1606
1607       if (!tableName.equals(pair.getFirst().getTable())) {
1608         throw new IllegalArgumentException ("Cannot merge regions from two different tables " +
1609           tableName + " and " + pair.getFirst().getTable());
1610       }
1611     } else {
1612       throw new UnknownRegionException (
1613         "Can't invoke merge on unknown region " + Bytes.toStringBinary(encodedNameOfRegionB));
1614     }
1615
1616     DispatchMergingRegionsResponse response =
1617       executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection()) {
1618       @Override
1619       public DispatchMergingRegionsResponse call(int callTimeout) throws ServiceException {
1620         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1621         controller.setCallTimeout(callTimeout);
1622
1623         try {
1624           DispatchMergingRegionsRequest request = RequestConverter
1625               .buildDispatchMergingRegionsRequest(
1626                 encodedNameOfRegionA,
1627                 encodedNameOfRegionB,
1628                 forcible,
1629                 ng.getNonceGroup(),
1630                 ng.newNonce());
1631           return master.dispatchMergingRegions(controller, request);
1632         } catch (DeserializationException de) {
1633           LOG.error("Could not parse destination server name: " + de);
1634           throw new ServiceException(new DoNotRetryIOException(de));
1635         }
1636       }
1637     });
1638     return new DispatchMergingRegionsFuture(this, tableName, response);
1639   }
1640
1641   private static class DispatchMergingRegionsFuture extends TableFuture<Void> {
1642     public DispatchMergingRegionsFuture(
1643         final HBaseAdmin admin,
1644         final TableName tableName,
1645         final DispatchMergingRegionsResponse response) {
1646       super(admin, tableName,
1647           (response != null && response.hasProcId()) ? response.getProcId() : null);
1648     }
1649
1650     public DispatchMergingRegionsFuture(
1651         final HBaseAdmin admin,
1652         final TableName tableName,
1653         final Long procId) {
1654       super(admin, tableName, procId);
1655     }
1656
1657     @Override
1658     public String getOperationType() {
1659       return "MERGE_REGIONS";
1660     }
1661   }
1662
1663   @Override
1664   public void split(final TableName tableName) throws IOException {
1665     split(tableName, null);
1666   }
1667
1668   @Override
1669   public void splitRegion(final byte[] regionName) throws IOException {
1670     splitRegion(regionName, null);
1671   }
1672
1673   /**
1674    * {@inheritDoc}
1675    */
1676   @Override
1677   public void split(final TableName tableName, final byte [] splitPoint) throws IOException {
1678     ZooKeeperWatcher zookeeper = null;
1679     try {
1680       checkTableExists(tableName);
1681       zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
1682         new ThrowableAbortable());
1683       List<Pair<HRegionInfo, ServerName>> pairs;
1684       if (TableName.META_TABLE_NAME.equals(tableName)) {
1685         pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
1686       } else {
1687         pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
1688       }
1689       for (Pair<HRegionInfo, ServerName> pair: pairs) {
1690         // May not be a server for a particular row
1691         if (pair.getSecond() == null) continue;
1692         HRegionInfo r = pair.getFirst();
1693         // check for parents
1694         if (r.isSplitParent()) continue;
1695         // if a split point given, only split that particular region
1696         if (r.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1697            (splitPoint != null && !r.containsRow(splitPoint))) continue;
1698         // call out to region server to do split now
1699         split(pair.getSecond(), pair.getFirst(), splitPoint);
1700       }
1701     } finally {
1702       if (zookeeper != null) {
1703         zookeeper.close();
1704       }
1705     }
1706   }
1707
1708   @Override
1709   public void splitRegion(final byte[] regionName, final byte [] splitPoint) throws IOException {
1710     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
1711     if (regionServerPair == null) {
1712       throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1713     }
1714     if (regionServerPair.getFirst() != null &&
1715         regionServerPair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1716       throw new IllegalArgumentException("Can't split replicas directly. "
1717           + "Replicas are auto-split when their primary is split.");
1718     }
1719     if (regionServerPair.getSecond() == null) {
1720       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1721     }
1722     split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
1723   }
1724
1725   @VisibleForTesting
1726   public void split(final ServerName sn, final HRegionInfo hri,
1727       byte[] splitPoint) throws IOException {
1728     if (hri.getStartKey() != null && splitPoint != null &&
1729          Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
1730        throw new IOException("should not give a splitkey which equals to startkey!");
1731     }
1732     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1733     controller.setPriority(hri.getTable());
1734
1735     // TODO: this does not do retries, it should. Set priority and timeout in controller
1736     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1737     ProtobufUtil.split(controller, admin, hri, splitPoint);
1738   }
1739
1740   @Override
1741   public Future<Void> modifyTable(final TableName tableName, final HTableDescriptor htd)
1742       throws IOException {
1743     if (!tableName.equals(htd.getTableName())) {
1744       throw new IllegalArgumentException("the specified table name '" + tableName +
1745         "' doesn't match with the HTD one: " + htd.getTableName());
1746     }
1747
1748     ModifyTableResponse response = executeCallable(
1749       new MasterCallable<ModifyTableResponse>(getConnection()) {
1750         @Override
1751         public ModifyTableResponse call(int callTimeout) throws ServiceException {
1752           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1753           controller.setCallTimeout(callTimeout);
1754           controller.setPriority(tableName);
1755
1756           ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
1757             tableName, htd, ng.getNonceGroup(), ng.newNonce());
1758           return master.modifyTable(controller, request);
1759         }
1760       });
1761
1762     return new ModifyTableFuture(this, tableName, response);
1763   }
1764
1765   private static class ModifyTableFuture extends TableFuture<Void> {
1766     public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName,
1767         final ModifyTableResponse response) {
1768       super(admin, tableName,
1769           (response != null && response.hasProcId()) ? response.getProcId() : null);
1770     }
1771
1772     public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
1773       super(admin, tableName, procId);
1774     }
1775
1776     @Override
1777     public String getOperationType() {
1778       return "MODIFY";
1779     }
1780
1781     @Override
1782     protected Void postOperationResult(final Void result, final long deadlineTs)
1783         throws IOException, TimeoutException {
1784       // The modify operation on the table is asynchronous on the server side irrespective
1785       // of whether Procedure V2 is supported or not. So, we wait in the client till
1786       // all regions get updated.
1787       waitForSchemaUpdate(deadlineTs);
1788       return result;
1789     }
1790   }
1791
1792   /**
1793    * @param regionName Name of a region.
1794    * @return a pair of HRegionInfo and ServerName if <code>regionName</code> is
1795    *  a verified region name (we call {@link
1796    *  MetaTableAccessor#getRegionLocation(Connection, byte[])}
1797    *  else null.
1798    * Throw IllegalArgumentException if <code>regionName</code> is null.
1799    * @throws IOException
1800    */
1801   Pair<HRegionInfo, ServerName> getRegion(final byte[] regionName) throws IOException {
1802     if (regionName == null) {
1803       throw new IllegalArgumentException("Pass a table name or region name");
1804     }
1805     Pair<HRegionInfo, ServerName> pair =
1806       MetaTableAccessor.getRegion(connection, regionName);
1807     if (pair == null) {
1808       final AtomicReference<Pair<HRegionInfo, ServerName>> result =
1809         new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
1810       final String encodedName = Bytes.toString(regionName);
1811       MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
1812         @Override
1813         public boolean visit(Result data) throws IOException {
1814           HRegionInfo info = MetaTableAccessor.getHRegionInfo(data);
1815           if (info == null) {
1816             LOG.warn("No serialized HRegionInfo in " + data);
1817             return true;
1818           }
1819           RegionLocations rl = MetaTableAccessor.getRegionLocations(data);
1820           boolean matched = false;
1821           ServerName sn = null;
1822           if (rl != null) {
1823             for (HRegionLocation h : rl.getRegionLocations()) {
1824               if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) {
1825                 sn = h.getServerName();
1826                 info = h.getRegionInfo();
1827                 matched = true;
1828               }
1829             }
1830           }
1831           if (!matched) return true;
1832           result.set(new Pair<HRegionInfo, ServerName>(info, sn));
1833           return false; // found the region, stop
1834         }
1835       };
1836
1837       MetaTableAccessor.fullScanRegions(connection, visitor);
1838       pair = result.get();
1839     }
1840     return pair;
1841   }
1842
1843   /**
1844    * If the input is a region name, it is returned as is. If it's an
1845    * encoded region name, the corresponding region is found from meta
1846    * and its region name is returned. If we can't find any region in
1847    * meta matching the input as either region name or encoded region
1848    * name, the input is returned as is. We don't throw unknown
1849    * region exception.
1850    */
1851   private byte[] getRegionName(
1852       final byte[] regionNameOrEncodedRegionName) throws IOException {
1853     if (Bytes.equals(regionNameOrEncodedRegionName,
1854         HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
1855           || Bytes.equals(regionNameOrEncodedRegionName,
1856             HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
1857       return HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
1858     }
1859     byte[] tmp = regionNameOrEncodedRegionName;
1860     Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionNameOrEncodedRegionName);
1861     if (regionServerPair != null && regionServerPair.getFirst() != null) {
1862       tmp = regionServerPair.getFirst().getRegionName();
1863     }
1864     return tmp;
1865   }
1866
1867   /**
1868    * Check if table exists or not
1869    * @param tableName Name of a table.
1870    * @return tableName instance
1871    * @throws IOException if a remote or network exception occurs.
1872    * @throws TableNotFoundException if table does not exist.
1873    */
1874   private TableName checkTableExists(final TableName tableName)
1875       throws IOException {
1876     return executeCallable(new ConnectionCallable<TableName>(getConnection()) {
1877       @Override
1878       public TableName call(int callTimeout) throws ServiceException, IOException {
1879         if (!MetaTableAccessor.tableExists(connection, tableName)) {
1880           throw new TableNotFoundException(tableName);
1881         }
1882         return tableName;
1883       }
1884     });
1885   }
1886
1887   @Override
1888   public synchronized void shutdown() throws IOException {
1889     executeCallable(new MasterCallable<Void>(getConnection()) {
1890       @Override
1891       public Void call(int callTimeout) throws ServiceException {
1892         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1893         controller.setCallTimeout(callTimeout);
1894         controller.setPriority(HConstants.HIGH_QOS);
1895         master.shutdown(controller, ShutdownRequest.newBuilder().build());
1896         return null;
1897       }
1898     });
1899   }
1900
1901   @Override
1902   public synchronized void stopMaster() throws IOException {
1903     executeCallable(new MasterCallable<Void>(getConnection()) {
1904       @Override
1905       public Void call(int callTimeout) throws ServiceException {
1906         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1907         controller.setCallTimeout(callTimeout);
1908         controller.setPriority(HConstants.HIGH_QOS);
1909         master.stopMaster(controller, StopMasterRequest.newBuilder().build());
1910         return null;
1911       }
1912     });
1913   }
1914
1915   @Override
1916   public synchronized void stopRegionServer(final String hostnamePort)
1917   throws IOException {
1918     String hostname = Addressing.parseHostname(hostnamePort);
1919     int port = Addressing.parsePort(hostnamePort);
1920     AdminService.BlockingInterface admin =
1921       this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
1922     StopServerRequest request = RequestConverter.buildStopServerRequest(
1923       "Called by admin client " + this.connection.toString());
1924     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1925
1926     controller.setPriority(HConstants.HIGH_QOS);
1927     try {
1928       // TODO: this does not do retries, it should. Set priority and timeout in controller
1929       admin.stopServer(controller, request);
1930     } catch (ServiceException se) {
1931       throw ProtobufUtil.getRemoteException(se);
1932     }
1933   }
1934
1935   @Override
1936   public ClusterStatus getClusterStatus() throws IOException {
1937     return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
1938       @Override
1939       public ClusterStatus call(int callTimeout) throws ServiceException {
1940         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1941         controller.setCallTimeout(callTimeout);
1942         GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
1943         return ProtobufUtil.convert(master.getClusterStatus(controller, req).getClusterStatus());
1944       }
1945     });
1946   }
1947
1948   @Override
1949   public Configuration getConfiguration() {
1950     return this.conf;
1951   }
1952
1953   /**
1954    * Do a get with a timeout against the passed in <code>future<code>.
1955    */
1956   private static <T> T get(final Future<T> future, final long timeout, final TimeUnit units)
1957   throws IOException {
1958     try {
1959       // TODO: how long should we wait? Spin forever?
1960       return future.get(timeout, units);
1961     } catch (InterruptedException e) {
1962       throw new InterruptedIOException("Interrupt while waiting on " + future);
1963     } catch (TimeoutException e) {
1964       throw new TimeoutIOException(e);
1965     } catch (ExecutionException e) {
1966       if (e.getCause() instanceof IOException) {
1967         throw (IOException)e.getCause();
1968       } else {
1969         throw new IOException(e.getCause());
1970       }
1971     }
1972   }
1973
1974   @Override
1975   public void createNamespace(final NamespaceDescriptor descriptor)
1976   throws IOException {
1977     get(createNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
1978   }
1979
1980   @Override
1981   public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
1982       throws IOException {
1983     CreateNamespaceResponse response =
1984         executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection()) {
1985           @Override
1986           public CreateNamespaceResponse call(int callTimeout) throws Exception {
1987             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1988             controller.setCallTimeout(callTimeout);
1989             // TODO: set priority based on NS?
1990             return master.createNamespace(controller,
1991               CreateNamespaceRequest.newBuilder()
1992               .setNamespaceDescriptor(ProtobufUtil
1993                 .toProtoNamespaceDescriptor(descriptor)).build()
1994                 );
1995           }
1996         });
1997     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
1998       @Override
1999       public String getOperationType() {
2000         return "CREATE_NAMESPACE";
2001       }
2002     };
2003   }
2004
2005   @Override
2006   public void modifyNamespace(final NamespaceDescriptor descriptor)
2007   throws IOException {
2008     get(modifyNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
2009   }
2010
2011   @Override
2012   public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
2013       throws IOException {
2014     ModifyNamespaceResponse response =
2015         executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection()) {
2016           @Override
2017           public ModifyNamespaceResponse call(int callTimeout) throws Exception {
2018             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2019             controller.setCallTimeout(callTimeout);
2020             // TODO: set priority based on NS?
2021             return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder().
2022               setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
2023           }
2024         });
2025     return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
2026       @Override
2027       public String getOperationType() {
2028         return "MODIFY_NAMESPACE";
2029       }
2030     };
2031   }
2032
2033   @Override
2034   public void deleteNamespace(final String name)
2035   throws IOException {
2036     get(deleteNamespaceAsync(name), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
2037   }
2038
2039   @Override
2040   public Future<Void> deleteNamespaceAsync(final String name)
2041       throws IOException {
2042     DeleteNamespaceResponse response =
2043         executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection()) {
2044           @Override
2045           public DeleteNamespaceResponse call(int callTimeout) throws Exception {
2046             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2047             controller.setCallTimeout(callTimeout);
2048             // TODO: set priority based on NS?
2049             return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder().
2050               setNamespaceName(name).build());
2051           }
2052         });
2053     return new NamespaceFuture(this, name, response.getProcId()) {
2054       @Override
2055       public String getOperationType() {
2056         return "DELETE_NAMESPACE";
2057       }
2058     };
2059   }
2060
2061   @Override
2062   public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException {
2063     return
2064         executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
2065           @Override
2066           public NamespaceDescriptor call(int callTimeout) throws Exception {
2067             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2068             controller.setCallTimeout(callTimeout);
2069             return ProtobufUtil.toNamespaceDescriptor(
2070               master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder().
2071                 setNamespaceName(name).build()).getNamespaceDescriptor());
2072           }
2073         });
2074   }
2075
2076   @Override
2077   public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
2078     return
2079         executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
2080           @Override
2081           public NamespaceDescriptor[] call(int callTimeout) throws Exception {
2082             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2083             controller.setCallTimeout(callTimeout);
2084             List<HBaseProtos.NamespaceDescriptor> list =
2085                 master.listNamespaceDescriptors(controller,
2086                   ListNamespaceDescriptorsRequest.newBuilder().build())
2087                 .getNamespaceDescriptorList();
2088             NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
2089             for(int i = 0; i < list.size(); i++) {
2090               res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
2091             }
2092             return res;
2093           }
2094         });
2095   }
2096
2097   @Override
2098   public ProcedureInfo[] listProcedures() throws IOException {
2099     return
2100         executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) {
2101           @Override
2102           public ProcedureInfo[] call(int callTimeout) throws Exception {
2103             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2104             controller.setCallTimeout(callTimeout);
2105             List<ProcedureProtos.Procedure> procList = master.listProcedures(
2106               controller, ListProceduresRequest.newBuilder().build()).getProcedureList();
2107             ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
2108             for (int i = 0; i < procList.size(); i++) {
2109               procInfoList[i] = ProcedureUtil.convert(procList.get(i));
2110             }
2111             return procInfoList;
2112           }
2113         });
2114   }
2115
2116   @Override
2117   public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
2118     return
2119         executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
2120           @Override
2121           public HTableDescriptor[] call(int callTimeout) throws Exception {
2122             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2123             controller.setCallTimeout(callTimeout);
2124             List<TableSchema> list =
2125                 master.listTableDescriptorsByNamespace(controller,
2126                   ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
2127                   .build()).getTableSchemaList();
2128             HTableDescriptor[] res = new HTableDescriptor[list.size()];
2129             for(int i=0; i < list.size(); i++) {
2130
2131               res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
2132             }
2133             return res;
2134           }
2135         });
2136   }
2137
2138   @Override
2139   public TableName[] listTableNamesByNamespace(final String name) throws IOException {
2140     return
2141         executeCallable(new MasterCallable<TableName[]>(getConnection()) {
2142           @Override
2143           public TableName[] call(int callTimeout) throws Exception {
2144             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2145             controller.setCallTimeout(callTimeout);
2146             List<HBaseProtos.TableName> tableNames =
2147               master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest.
2148                 newBuilder().setNamespaceName(name).build())
2149                 .getTableNameList();
2150             TableName[] result = new TableName[tableNames.size()];
2151             for (int i = 0; i < tableNames.size(); i++) {
2152               result[i] = ProtobufUtil.toTableName(tableNames.get(i));
2153             }
2154             return result;
2155           }
2156         });
2157   }
2158
2159   /**
2160    * Check to see if HBase is running. Throw an exception if not.
2161    * @param conf system configuration
2162    * @throws MasterNotRunningException if the master is not running
2163    * @throws ZooKeeperConnectionException if unable to connect to zookeeper
2164    */
2165   // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
2166   public static void checkHBaseAvailable(Configuration conf)
2167   throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
2168     Configuration copyOfConf = HBaseConfiguration.create(conf);
2169     // We set it to make it fail as soon as possible if HBase is not available
2170     copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
2171     copyOfConf.setInt("zookeeper.recovery.retry", 0);
2172
2173     // Check ZK first.
2174     // If the connection exists, we may have a connection to ZK that does not work anymore
2175     try (ClusterConnection connection =
2176              (ClusterConnection) ConnectionFactory.createConnection(copyOfConf);
2177          ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection).
2178              getKeepAliveZooKeeperWatcher();) {
2179
2180       // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
2181       zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
2182       connection.isMasterRunning();
2183     } catch (IOException e) {
2184       throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
2185     } catch (InterruptedException e) {
2186       throw (InterruptedIOException)
2187           new InterruptedIOException("Can't connect to ZooKeeper").initCause(e);
2188     } catch (KeeperException e) {
2189       throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
2190     }
2191   }
2192
2193   @Override
2194   public List<HRegionInfo> getTableRegions(final TableName tableName)
2195   throws IOException {
2196     ZooKeeperWatcher zookeeper =
2197       new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
2198         new ThrowableAbortable());
2199     List<HRegionInfo> regions = null;
2200     try {
2201       if (TableName.META_TABLE_NAME.equals(tableName)) {
2202         regions = new MetaTableLocator().getMetaRegions(zookeeper);
2203       } else {
2204         regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
2205       }
2206     } finally {
2207       zookeeper.close();
2208     }
2209     return regions;
2210   }
2211
2212   @Override
2213   public synchronized void close() throws IOException {
2214   }
2215
2216   @Override
2217   public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames)
2218   throws IOException {
2219     return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
2220       @Override
2221       public HTableDescriptor[] call(int callTimeout) throws Exception {
2222         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2223         controller.setCallTimeout(callTimeout);
2224         GetTableDescriptorsRequest req =
2225             RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2226           return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
2227       }
2228     });
2229   }
2230
2231   /**
2232    * Get tableDescriptor
2233    * @param tableName one table name
2234    * @return HTD the HTableDescriptor or null if the table not exists
2235    * @throws IOException if a remote or network exception occurs
2236    */
2237   private HTableDescriptor getTableDescriptorByTableName(TableName tableName)
2238       throws IOException {
2239     List<TableName> tableNames = new ArrayList<TableName>(1);
2240     tableNames.add(tableName);
2241
2242     HTableDescriptor[] htdl = getTableDescriptorsByTableName(tableNames);
2243
2244     if (htdl == null || htdl.length == 0) {
2245       return null;
2246     }
2247     else {
2248       return htdl[0];
2249     }
2250   }
2251
2252   @Override
2253   public HTableDescriptor[] getTableDescriptors(List<String> names)
2254   throws IOException {
2255     List<TableName> tableNames = new ArrayList<TableName>(names.size());
2256     for(String name : names) {
2257       tableNames.add(TableName.valueOf(name));
2258     }
2259     return getTableDescriptorsByTableName(tableNames);
2260   }
2261
2262   private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
2263       FailedLogCloseException {
2264     AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2265     RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
2266     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2267
2268     try {
2269       // TODO: this does not do retries, it should. Set priority and timeout in controller
2270       return admin.rollWALWriter(controller, request);
2271     } catch (ServiceException se) {
2272       throw ProtobufUtil.getRemoteException(se);
2273     }
2274   }
2275
2276   /**
2277    * Roll the log writer. I.e. when using a file system based write ahead log,
2278    * start writing log messages to a new file.
2279    *
2280    * Note that when talking to a version 1.0+ HBase deployment, the rolling is asynchronous.
2281    * This method will return as soon as the roll is requested and the return value will
2282    * always be null. Additionally, the named region server may schedule store flushes at the
2283    * request of the wal handling the roll request.
2284    *
2285    * When talking to a 0.98 or older HBase deployment, the rolling is synchronous and the
2286    * return value may be either null or a list of encoded region names.
2287    *
2288    * @param serverName
2289    *          The servername of the regionserver. A server name is made of host,
2290    *          port and startcode. This is mandatory. Here is an example:
2291    *          <code> host187.example.com,60020,1289493121758</code>
2292    * @return a set of {@link HRegionInfo#getEncodedName()} that would allow the wal to
2293    *         clean up some underlying files. null if there's nothing to flush.
2294    * @throws IOException if a remote or network exception occurs
2295    * @throws FailedLogCloseException
2296    * @deprecated use {@link #rollWALWriter(ServerName)}
2297    */
2298   @Deprecated
2299   public synchronized byte[][] rollHLogWriter(String serverName)
2300       throws IOException, FailedLogCloseException {
2301     ServerName sn = ServerName.valueOf(serverName);
2302     final RollWALWriterResponse response = rollWALWriterImpl(sn);
2303     int regionCount = response.getRegionToFlushCount();
2304     if (0 == regionCount) {
2305       return null;
2306     }
2307     byte[][] regionsToFlush = new byte[regionCount][];
2308     for (int i = 0; i < regionCount; i++) {
2309       ByteString region = response.getRegionToFlush(i);
2310       regionsToFlush[i] = region.toByteArray();
2311     }
2312     return regionsToFlush;
2313   }
2314
2315   @Override
2316   public synchronized void rollWALWriter(ServerName serverName)
2317       throws IOException, FailedLogCloseException {
2318     rollWALWriterImpl(serverName);
2319   }
2320
2321   @Override
2322   public String[] getMasterCoprocessors() {
2323     try {
2324       return getClusterStatus().getMasterCoprocessors();
2325     } catch (IOException e) {
2326       LOG.error("Could not getClusterStatus()",e);
2327       return null;
2328     }
2329   }
2330
2331   @Override
2332   public CompactionState getCompactionState(final TableName tableName)
2333   throws IOException {
2334     return getCompactionState(tableName, CompactType.NORMAL);
2335   }
2336
2337   @Override
2338   public CompactionState getCompactionStateForRegion(final byte[] regionName)
2339   throws IOException {
2340     try {
2341       Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
2342       if (regionServerPair == null) {
2343         throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
2344       }
2345       if (regionServerPair.getSecond() == null) {
2346         throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
2347       }
2348       ServerName sn = regionServerPair.getSecond();
2349       AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2350       GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
2351         regionServerPair.getFirst().getRegionName(), true);
2352       PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2353       // TODO: this does not do retries, it should. Set priority and timeout in controller
2354       GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
2355       if (response.getCompactionState() != null) {
2356         return ProtobufUtil.createCompactionState(response.getCompactionState());
2357       }
2358       return null;
2359     } catch (ServiceException se) {
2360       throw ProtobufUtil.getRemoteException(se);
2361     }
2362   }
2363
2364   @Override
2365   public void snapshot(final String snapshotName,
2366                        final TableName tableName) throws IOException,
2367       SnapshotCreationException, IllegalArgumentException {
2368     snapshot(snapshotName, tableName, SnapshotType.FLUSH);
2369   }
2370
2371   @Override
2372   public void snapshot(final byte[] snapshotName, final TableName tableName)
2373       throws IOException, SnapshotCreationException, IllegalArgumentException {
2374     snapshot(Bytes.toString(snapshotName), tableName, SnapshotType.FLUSH);
2375   }
2376
2377   @Override
2378   public void snapshot(final String snapshotName, final TableName tableName,
2379       SnapshotType type)
2380       throws IOException, SnapshotCreationException, IllegalArgumentException {
2381     snapshot(new SnapshotDescription(snapshotName, tableName.getNameAsString(), type));
2382   }
2383
2384   @Override
2385   public void snapshot(SnapshotDescription snapshotDesc)
2386       throws IOException, SnapshotCreationException, IllegalArgumentException {
2387     // actually take the snapshot
2388     HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
2389     SnapshotResponse response = asyncSnapshot(snapshot);
2390     final IsSnapshotDoneRequest request =
2391         IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build();
2392     IsSnapshotDoneResponse done = null;
2393     long start = EnvironmentEdgeManager.currentTime();
2394     long max = response.getExpectedTimeout();
2395     long maxPauseTime = max / this.numRetries;
2396     int tries = 0;
2397     LOG.debug("Waiting a max of " + max + " ms for snapshot '" +
2398         ClientSnapshotDescriptionUtils.toString(snapshot) + "'' to complete. (max " +
2399         maxPauseTime + " ms per retry)");
2400     while (tries == 0
2401         || ((EnvironmentEdgeManager.currentTime() - start) < max && !done.getDone())) {
2402       try {
2403         // sleep a backoff <= pauseTime amount
2404         long sleep = getPauseTime(tries++);
2405         sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2406         LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2407           "ms while waiting for snapshot completion.");
2408         Thread.sleep(sleep);
2409       } catch (InterruptedException e) {
2410         throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2411       }
2412       LOG.debug("Getting current status of snapshot from master...");
2413       done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
2414         @Override
2415         public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
2416           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2417           controller.setCallTimeout(callTimeout);
2418           return master.isSnapshotDone(controller, request);
2419         }
2420       });
2421     }
2422     if (!done.getDone()) {
2423       throw new SnapshotCreationException("Snapshot '" + snapshot.getName()
2424           + "' wasn't completed in expectedTime:" + max + " ms", snapshotDesc);
2425     }
2426   }
2427
2428   @Override
2429   public void takeSnapshotAsync(SnapshotDescription snapshotDesc) throws IOException,
2430       SnapshotCreationException {
2431     HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
2432     asyncSnapshot(snapshot);
2433   }
2434
2435   private HBaseProtos.SnapshotDescription
2436       createHBaseProtosSnapshotDesc(SnapshotDescription snapshotDesc) {
2437     HBaseProtos.SnapshotDescription.Builder builder = HBaseProtos.SnapshotDescription.newBuilder();
2438     if (snapshotDesc.getTable() != null) {
2439       builder.setTable(snapshotDesc.getTable());
2440     }
2441     if (snapshotDesc.getName() != null) {
2442       builder.setName(snapshotDesc.getName());
2443     }
2444     if (snapshotDesc.getOwner() != null) {
2445       builder.setOwner(snapshotDesc.getOwner());
2446     }
2447     if (snapshotDesc.getCreationTime() != -1) {
2448       builder.setCreationTime(snapshotDesc.getCreationTime());
2449     }
2450     if (snapshotDesc.getVersion() != -1) {
2451       builder.setVersion(snapshotDesc.getVersion());
2452     }
2453     builder.setType(ProtobufUtil.createProtosSnapShotDescType(snapshotDesc.getType()));
2454     HBaseProtos.SnapshotDescription snapshot = builder.build();
2455     return snapshot;
2456   }
2457
2458   private SnapshotResponse asyncSnapshot(HBaseProtos.SnapshotDescription snapshot)
2459       throws IOException {
2460     ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2461     final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2462         .build();
2463     // run the snapshot on the master
2464     return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
2465       @Override
2466       public SnapshotResponse call(int callTimeout) throws ServiceException {
2467         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2468         controller.setCallTimeout(callTimeout);
2469         return master.snapshot(controller, request);
2470       }
2471     });
2472   }
2473
2474   @Override
2475   public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
2476       throws IOException, HBaseSnapshotException, UnknownSnapshotException {
2477     final HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
2478     return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
2479       @Override
2480       public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
2481         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2482         controller.setCallTimeout(callTimeout);
2483         return master.isSnapshotDone(controller,
2484           IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
2485       }
2486     }).getDone();
2487   }
2488
2489   @Override
2490   public void restoreSnapshot(final byte[] snapshotName)
2491       throws IOException, RestoreSnapshotException {
2492     restoreSnapshot(Bytes.toString(snapshotName));
2493   }
2494
2495   @Override
2496   public void restoreSnapshot(final String snapshotName)
2497       throws IOException, RestoreSnapshotException {
2498     boolean takeFailSafeSnapshot =
2499       conf.getBoolean("hbase.snapshot.restore.take.failsafe.snapshot", false);
2500     restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2501   }
2502
2503   @Override
2504   public void restoreSnapshot(final byte[] snapshotName, final boolean takeFailSafeSnapshot)
2505       throws IOException, RestoreSnapshotException {
2506     restoreSnapshot(Bytes.toString(snapshotName), takeFailSafeSnapshot);
2507   }
2508
2509   /*
2510    * Check whether the snapshot exists and contains disabled table
2511    *
2512    * @param snapshotName name of the snapshot to restore
2513    * @throws IOException if a remote or network exception occurs
2514    * @throws RestoreSnapshotException if no valid snapshot is found
2515    */
2516   private TableName getTableNameBeforeRestoreSnapshot(final String snapshotName)
2517       throws IOException, RestoreSnapshotException {
2518     TableName tableName = null;
2519     for (SnapshotDescription snapshotInfo: listSnapshots()) {
2520       if (snapshotInfo.getName().equals(snapshotName)) {
2521         tableName = TableName.valueOf(snapshotInfo.getTable());
2522         break;
2523       }
2524     }
2525
2526     if (tableName == null) {
2527       throw new RestoreSnapshotException(
2528         "Unable to find the table name for snapshot=" + snapshotName);
2529     }
2530     return tableName;
2531   }
2532
2533   @Override
2534   public void restoreSnapshot(final String snapshotName, final boolean takeFailSafeSnapshot)
2535       throws IOException, RestoreSnapshotException {
2536     TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2537
2538     // The table does not exists, switch to clone.
2539     if (!tableExists(tableName)) {
2540       cloneSnapshot(snapshotName, tableName);
2541       return;
2542     }
2543
2544     // Check if the table is disabled
2545     if (!isTableDisabled(tableName)) {
2546       throw new TableNotDisabledException(tableName);
2547     }
2548
2549     // Take a snapshot of the current state
2550     String failSafeSnapshotSnapshotName = null;
2551     if (takeFailSafeSnapshot) {
2552       failSafeSnapshotSnapshotName = conf.get("hbase.snapshot.restore.failsafe.name",
2553         "hbase-failsafe-{snapshot.name}-{restore.timestamp}");
2554       failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotName
2555         .replace("{snapshot.name}", snapshotName)
2556         .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2557         .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2558       LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2559       snapshot(failSafeSnapshotSnapshotName, tableName);
2560     }
2561
2562     try {
2563       // Restore snapshot
2564       get(
2565         internalRestoreSnapshotAsync(snapshotName, tableName),
2566         syncWaitTimeout,
2567         TimeUnit.MILLISECONDS);
2568     } catch (IOException e) {
2569       // Somthing went wrong during the restore...
2570       // if the pre-restore snapshot is available try to rollback
2571       if (takeFailSafeSnapshot) {
2572         try {
2573           get(
2574             internalRestoreSnapshotAsync(failSafeSnapshotSnapshotName, tableName),
2575             syncWaitTimeout,
2576             TimeUnit.MILLISECONDS);
2577           String msg = "Restore snapshot=" + snapshotName +
2578             " failed. Rollback to snapshot=" + failSafeSnapshotSnapshotName + " succeeded.";
2579           LOG.error(msg, e);
2580           throw new RestoreSnapshotException(msg, e);
2581         } catch (IOException ex) {
2582           String msg = "Failed to restore and rollback to snapshot=" + failSafeSnapshotSnapshotName;
2583           LOG.error(msg, ex);
2584           throw new RestoreSnapshotException(msg, e);
2585         }
2586       } else {
2587         throw new RestoreSnapshotException("Failed to restore snapshot=" + snapshotName, e);
2588       }
2589     }
2590
2591     // If the restore is succeeded, delete the pre-restore snapshot
2592     if (takeFailSafeSnapshot) {
2593       try {
2594         LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2595         deleteSnapshot(failSafeSnapshotSnapshotName);
2596       } catch (IOException e) {
2597         LOG.error("Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, e);
2598       }
2599     }
2600   }
2601
2602   @Override
2603   public Future<Void> restoreSnapshotAsync(final String snapshotName)
2604       throws IOException, RestoreSnapshotException {
2605     TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2606
2607     // The table does not exists, switch to clone.
2608     if (!tableExists(tableName)) {
2609       return cloneSnapshotAsync(snapshotName, tableName);
2610     }
2611
2612     // Check if the table is disabled
2613     if (!isTableDisabled(tableName)) {
2614       throw new TableNotDisabledException(tableName);
2615     }
2616
2617     return internalRestoreSnapshotAsync(snapshotName, tableName);
2618   }
2619
2620   @Override
2621   public void cloneSnapshot(final byte[] snapshotName, final TableName tableName)
2622       throws IOException, TableExistsException, RestoreSnapshotException {
2623     cloneSnapshot(Bytes.toString(snapshotName), tableName);
2624   }
2625
2626   @Override
2627   public void cloneSnapshot(final String snapshotName, final TableName tableName)
2628       throws IOException, TableExistsException, RestoreSnapshotException {
2629     if (tableExists(tableName)) {
2630       throw new TableExistsException(tableName);
2631     }
2632     get(
2633       internalRestoreSnapshotAsync(snapshotName, tableName),
2634       Integer.MAX_VALUE,
2635       TimeUnit.MILLISECONDS);
2636   }
2637
2638   @Override
2639   public Future<Void> cloneSnapshotAsync(final String snapshotName, final TableName tableName)
2640       throws IOException, TableExistsException {
2641     if (tableExists(tableName)) {
2642       throw new TableExistsException(tableName);
2643     }
2644     return internalRestoreSnapshotAsync(snapshotName, tableName);
2645   }
2646
2647   @Override
2648   public byte[] execProcedureWithRet(String signature, String instance, Map<String, String> props)
2649       throws IOException {
2650     ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
2651     builder.setSignature(signature).setInstance(instance);
2652     for (Entry<String, String> entry : props.entrySet()) {
2653       NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
2654           .setValue(entry.getValue()).build();
2655       builder.addConfiguration(pair);
2656     }
2657
2658     final ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
2659         .setProcedure(builder.build()).build();
2660     // run the procedure on the master
2661     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
2662         getConnection()) {
2663       @Override
2664       public ExecProcedureResponse call(int callTimeout) throws ServiceException {
2665         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2666         controller.setCallTimeout(callTimeout);
2667         return master.execProcedureWithRet(controller, request);
2668       }
2669     });
2670
2671     return response.hasReturnData() ? response.getReturnData().toByteArray() : null;
2672   }
2673
2674   @Override
2675   public void execProcedure(String signature, String instance, Map<String, String> props)
2676       throws IOException {
2677     ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
2678     builder.setSignature(signature).setInstance(instance);
2679     for (Entry<String, String> entry : props.entrySet()) {
2680       NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
2681           .setValue(entry.getValue()).build();
2682       builder.addConfiguration(pair);
2683     }
2684
2685     final ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
2686         .setProcedure(builder.build()).build();
2687     // run the procedure on the master
2688     ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
2689         getConnection()) {
2690       @Override
2691       public ExecProcedureResponse call(int callTimeout) throws ServiceException {
2692         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2693         controller.setCallTimeout(callTimeout);
2694         return master.execProcedure(controller, request);
2695       }
2696     });
2697
2698     long start = EnvironmentEdgeManager.currentTime();
2699     long max = response.getExpectedTimeout();
2700     long maxPauseTime = max / this.numRetries;
2701     int tries = 0;
2702     LOG.debug("Waiting a max of " + max + " ms for procedure '" +
2703         signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
2704     boolean done = false;
2705     while (tries == 0
2706         || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
2707       try {
2708         // sleep a backoff <= pauseTime amount
2709         long sleep = getPauseTime(tries++);
2710         sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2711         LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2712           "ms while waiting for procedure completion.");
2713         Thread.sleep(sleep);
2714       } catch (InterruptedException e) {
2715         throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2716       }
2717       LOG.debug("Getting current status of procedure from master...");
2718       done = isProcedureFinished(signature, instance, props);
2719     }
2720     if (!done) {
2721       throw new IOException("Procedure '" + signature + " : " + instance
2722           + "' wasn't completed in expectedTime:" + max + " ms");
2723     }
2724   }
2725
2726   @Override
2727   public boolean isProcedureFinished(String signature, String instance, Map<String, String> props)
2728       throws IOException {
2729     final ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
2730     builder.setSignature(signature).setInstance(instance);
2731     for (Entry<String, String> entry : props.entrySet()) {
2732       NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
2733           .setValue(entry.getValue()).build();
2734       builder.addConfiguration(pair);
2735     }
2736     final ProcedureDescription desc = builder.build();
2737     return executeCallable(
2738         new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
2739           @Override
2740           public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
2741             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2742             controller.setCallTimeout(callTimeout);
2743             return master.isProcedureDone(controller, IsProcedureDoneRequest
2744                 .newBuilder().setProcedure(desc).build());
2745           }
2746         }).getDone();
2747   }
2748
2749   /**
2750    * Execute Restore/Clone snapshot and wait for the server to complete (blocking).
2751    * To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to
2752    * create an HTable instance to this table before it is available.
2753    * @param snapshotName snapshot to restore
2754    * @param tableName table name to restore the snapshot on
2755    * @throws IOException if a remote or network exception occurs
2756    * @throws RestoreSnapshotException if snapshot failed to be restored
2757    * @throws IllegalArgumentException if the restore request is formatted incorrectly
2758    */
2759   private Future<Void> internalRestoreSnapshotAsync(
2760       final String snapshotName,
2761       final TableName tableName) throws IOException, RestoreSnapshotException {
2762     final HBaseProtos.SnapshotDescription snapshot = HBaseProtos.SnapshotDescription.newBuilder()
2763         .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2764
2765     // actually restore the snapshot
2766     ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2767
2768     RestoreSnapshotResponse response = executeCallable(
2769         new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
2770       @Override
2771       public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
2772         final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
2773             .setSnapshot(snapshot)
2774             .setNonceGroup(ng.getNonceGroup())
2775             .setNonce(ng.newNonce())
2776             .build();
2777         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2778         controller.setCallTimeout(callTimeout);
2779         return master.restoreSnapshot(controller, request);
2780       }
2781     });
2782
2783     return new RestoreSnapshotFuture(
2784       this, snapshot, TableName.valueOf(snapshot.getTable()), response);
2785   }
2786
2787   private static class RestoreSnapshotFuture extends TableFuture<Void> {
2788     public RestoreSnapshotFuture(
2789         final HBaseAdmin admin,
2790         final HBaseProtos.SnapshotDescription snapshot,
2791         final TableName tableName,
2792         final RestoreSnapshotResponse response) {
2793       super(admin, tableName,
2794           (response != null && response.hasProcId()) ? response.getProcId() : null);
2795
2796       if (response != null && !response.hasProcId()) {
2797         throw new UnsupportedOperationException("Client could not call old version of Server");
2798       }
2799     }
2800
2801     public RestoreSnapshotFuture(
2802         final HBaseAdmin admin,
2803         final TableName tableName,
2804         final Long procId) {
2805       super(admin, tableName, procId);
2806     }
2807
2808     @Override
2809     public String getOperationType() {
2810       return "MODIFY";
2811     }
2812   }
2813
2814   @Override
2815   public List<SnapshotDescription> listSnapshots() throws IOException {
2816     return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
2817       @Override
2818       public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
2819         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2820         controller.setCallTimeout(callTimeout);
2821         List<HBaseProtos.SnapshotDescription> snapshotsList = master
2822             .getCompletedSnapshots(controller, GetCompletedSnapshotsRequest.newBuilder().build())
2823             .getSnapshotsList();
2824         List<SnapshotDescription> result = new ArrayList<SnapshotDescription>(snapshotsList.size());
2825         for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) {
2826           result.add(new SnapshotDescription(snapshot.getName(), snapshot.getTable(),
2827               ProtobufUtil.createSnapshotType(snapshot.getType()), snapshot.getOwner(),
2828               snapshot.getCreationTime(), snapshot.getVersion()));
2829         }
2830         return result;
2831       }
2832     });
2833   }
2834
2835   @Override
2836   public List<SnapshotDescription> listSnapshots(String regex) throws IOException {
2837     return listSnapshots(Pattern.compile(regex));
2838   }
2839
2840   @Override
2841   public List<SnapshotDescription> listSnapshots(Pattern pattern) throws IOException {
2842     List<SnapshotDescription> matched = new LinkedList<SnapshotDescription>();
2843     List<SnapshotDescription> snapshots = listSnapshots();
2844     for (SnapshotDescription snapshot : snapshots) {
2845       if (pattern.matcher(snapshot.getName()).matches()) {
2846         matched.add(snapshot);
2847       }
2848     }
2849     return matched;
2850   }
2851
2852   @Override
2853   public List<SnapshotDescription> listTableSnapshots(String tableNameRegex,
2854       String snapshotNameRegex) throws IOException {
2855     return listTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
2856   }
2857
2858   @Override
2859   public List<SnapshotDescription> listTableSnapshots(Pattern tableNamePattern,
2860       Pattern snapshotNamePattern) throws IOException {
2861     TableName[] tableNames = listTableNames(tableNamePattern);
2862
2863     List<SnapshotDescription> tableSnapshots = new LinkedList<SnapshotDescription>();
2864     List<SnapshotDescription> snapshots = listSnapshots(snapshotNamePattern);
2865
2866     List<TableName> listOfTableNames = Arrays.asList(tableNames);
2867     for (SnapshotDescription snapshot : snapshots) {
2868       if (listOfTableNames.contains(TableName.valueOf(snapshot.getTable()))) {
2869         tableSnapshots.add(snapshot);
2870       }
2871     }
2872     return tableSnapshots;
2873   }
2874
2875   @Override
2876   public void deleteSnapshot(final byte[] snapshotName) throws IOException {
2877     deleteSnapshot(Bytes.toString(snapshotName));
2878   }
2879
2880   @Override
2881   public void deleteSnapshot(final String snapshotName) throws IOException {
2882     // make sure the snapshot is possibly valid
2883     TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
2884     // do the delete
2885     executeCallable(new MasterCallable<Void>(getConnection()) {
2886       @Override
2887       public Void call(int callTimeout) throws ServiceException {
2888         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2889         controller.setCallTimeout(callTimeout);
2890         master.deleteSnapshot(controller,
2891           DeleteSnapshotRequest.newBuilder().
2892               setSnapshot(
2893                 HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
2894               .build()
2895         );
2896         return null;
2897       }
2898     });
2899   }
2900
2901   @Override
2902   public void deleteSnapshots(final String regex) throws IOException {
2903     deleteSnapshots(Pattern.compile(regex));
2904   }
2905
2906   @Override
2907   public void deleteSnapshots(final Pattern pattern) throws IOException {
2908     List<SnapshotDescription> snapshots = listSnapshots(pattern);
2909     for (final SnapshotDescription snapshot : snapshots) {
2910       try {
2911         internalDeleteSnapshot(snapshot);
2912       } catch (IOException ex) {
2913         LOG.info(
2914           "Failed to delete snapshot " + snapshot.getName() + " for table " + snapshot.getTable(),
2915           ex);
2916       }
2917     }
2918   }
2919
2920   private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
2921     executeCallable(new MasterCallable<Void>(getConnection()) {
2922       @Override
2923       public Void call(int callTimeout) throws ServiceException {
2924         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2925         controller.setCallTimeout(callTimeout);
2926         this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder()
2927           .setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build());
2928         return null;
2929       }
2930     });
2931   }
2932
2933   @Override
2934   public void deleteTableSnapshots(String tableNameRegex, String snapshotNameRegex)
2935       throws IOException {
2936     deleteTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
2937   }
2938
2939   @Override
2940   public void deleteTableSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern)
2941       throws IOException {
2942     List<SnapshotDescription> snapshots = listTableSnapshots(tableNamePattern, snapshotNamePattern);
2943     for (SnapshotDescription snapshot : snapshots) {
2944       try {
2945         internalDeleteSnapshot(snapshot);
2946         LOG.debug("Successfully deleted snapshot: " + snapshot.getName());
2947       } catch (IOException e) {
2948         LOG.error("Failed to delete snapshot: " + snapshot.getName(), e);
2949       }
2950     }
2951   }
2952
2953   @Override
2954   public void setQuota(final QuotaSettings quota) throws IOException {
2955     executeCallable(new MasterCallable<Void>(getConnection()) {
2956       @Override
2957       public Void call(int callTimeout) throws ServiceException {
2958         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
2959         controller.setCallTimeout(callTimeout);
2960         this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota));
2961         return null;
2962       }
2963     });
2964   }
2965
2966   @Override
2967   public QuotaRetriever getQuotaRetriever(final QuotaFilter filter) throws IOException {
2968     return QuotaRetriever.open(conf, filter);
2969   }
2970
2971   private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable)
2972       throws IOException {
2973     return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout);
2974   }
2975
2976   static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
2977              RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout,
2978       int rpcTimeout) throws IOException {
2979     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
2980     try {
2981       return caller.callWithRetries(callable, operationTimeout);
2982     } finally {
2983       callable.close();
2984     }
2985   }
2986
2987   @Override
2988   public CoprocessorRpcChannel coprocessorService() {
2989     return new MasterCoprocessorRpcChannel(connection);
2990   }
2991
2992   /**
2993    * Simple {@link Abortable}, throwing RuntimeException on abort.
2994    */
2995   private static class ThrowableAbortable implements Abortable {
2996
2997     @Override
2998     public void abort(String why, Throwable e) {
2999       throw new RuntimeException(why, e);
3000     }
3001
3002     @Override
3003     public boolean isAborted() {
3004       return true;
3005     }
3006   }
3007
3008   @Override
3009   public CoprocessorRpcChannel coprocessorService(ServerName sn) {
3010     return new RegionServerCoprocessorRpcChannel(connection, sn);
3011   }
3012
3013   @Override
3014   public void updateConfiguration(ServerName server) throws IOException {
3015     try {
3016       this.connection.getAdmin(server).updateConfiguration(null,
3017         UpdateConfigurationRequest.getDefaultInstance());
3018     } catch (ServiceException e) {
3019       throw ProtobufUtil.getRemoteException(e);
3020     }
3021   }
3022
3023   @Override
3024   public void updateConfiguration() throws IOException {
3025     for (ServerName server : this.getClusterStatus().getServers()) {
3026       updateConfiguration(server);
3027     }
3028   }
3029
3030   @Override
3031   public int getMasterInfoPort() throws IOException {
3032     // TODO: Fix!  Reaching into internal implementation!!!!
3033     ConnectionImplementation connection =
3034         (ConnectionImplementation)this.connection;
3035     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
3036     try {
3037       return MasterAddressTracker.getMasterInfoPort(zkw);
3038     } catch (KeeperException e) {
3039       throw new IOException("Failed to get master info port from MasterAddressTracker", e);
3040     }
3041   }
3042
3043   private ServerName getMasterAddress() throws IOException {
3044     // TODO: Fix!  Reaching into internal implementation!!!!
3045     ConnectionImplementation connection =
3046             (ConnectionImplementation)this.connection;
3047     ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
3048     try {
3049       return MasterAddressTracker.getMasterAddress(zkw);
3050     } catch (KeeperException e) {
3051       throw new IOException("Failed to get master server name from MasterAddressTracker", e);
3052     }
3053   }
3054
3055   @Override
3056   public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
3057     return executeCallable(new MasterCallable<Long>(getConnection()) {
3058       @Override
3059       public Long call(int callTimeout) throws ServiceException {
3060         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
3061         controller.setCallTimeout(callTimeout);
3062         MajorCompactionTimestampRequest req =
3063             MajorCompactionTimestampRequest.newBuilder()
3064                 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3065         return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp();
3066       }
3067     });
3068   }
3069
3070   @Override
3071   public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
3072     return executeCallable(new MasterCallable<Long>(getConnection()) {
3073       @Override
3074       public Long call(int callTimeout) throws ServiceException {
3075         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
3076         controller.setCallTimeout(callTimeout);
3077         MajorCompactionTimestampForRegionRequest req =
3078             MajorCompactionTimestampForRegionRequest
3079                 .newBuilder()
3080                 .setRegion(
3081                   RequestConverter
3082                       .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
3083         return master.getLastMajorCompactionTimestampForRegion(controller, req)
3084             .getCompactionTimestamp();
3085       }
3086     });
3087   }
3088
3089   /**
3090    * {@inheritDoc}
3091    */
3092   @Override
3093   public void compact(final TableName tableName, final byte[] columnFamily, CompactType compactType)
3094     throws IOException, InterruptedException {
3095     compact(tableName, columnFamily, false, compactType);
3096   }
3097
3098   /**
3099    * {@inheritDoc}
3100    */
3101   @Override
3102   public void compact(final TableName tableName, CompactType compactType)
3103     throws IOException, InterruptedException {
3104     compact(tableName, null, false, compactType);
3105   }
3106
3107   /**
3108    * {@inheritDoc}
3109    */
3110   @Override
3111   public void majorCompact(final TableName tableName, final byte[] columnFamily,
3112     CompactType compactType) throws IOException, InterruptedException {
3113     compact(tableName, columnFamily, true, compactType);
3114   }
3115
3116   /**
3117    * {@inheritDoc}
3118    */
3119   @Override
3120   public void majorCompact(final TableName tableName, CompactType compactType)
3121           throws IOException, InterruptedException {
3122       compact(tableName, null, true, compactType);
3123   }
3124
3125   /**
3126    * {@inheritDoc}
3127    */
3128   @Override
3129   public CompactionState getCompactionState(TableName tableName,
3130     CompactType compactType) throws IOException {
3131     AdminProtos.GetRegionInfoResponse.CompactionState state =
3132         AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
3133     checkTableExists(tableName);
3134     PayloadCarryingRpcController controller = rpcControllerFactory.newController();
3135     switch (compactType) {
3136       case MOB:
3137         try {
3138           ServerName master = getMasterAddress();
3139           HRegionInfo info = getMobRegionInfo(tableName);
3140           GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
3141                   info.getRegionName(), true);
3142           GetRegionInfoResponse response = this.connection.getAdmin(master)
3143                   .getRegionInfo(controller, request);
3144           state = response.getCompactionState();
3145         } catch (ServiceException se) {
3146           throw ProtobufUtil.getRemoteException(se);
3147         }
3148         break;
3149       case NORMAL:
3150       default:
3151         ZooKeeperWatcher zookeeper = null;
3152         try {
3153           List<Pair<HRegionInfo, ServerName>> pairs;
3154           if (TableName.META_TABLE_NAME.equals(tableName)) {
3155             zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(),
3156               new ThrowableAbortable());
3157             pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper);
3158           } else {
3159             pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
3160           }
3161           for (Pair<HRegionInfo, ServerName> pair : pairs) {
3162             if (pair.getFirst().isOffline()) continue;
3163             if (pair.getSecond() == null) continue;
3164             try {
3165               ServerName sn = pair.getSecond();
3166               AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
3167               GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
3168                       pair.getFirst().getRegionName(), true);
3169               GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
3170               switch (response.getCompactionState()) {
3171                 case MAJOR_AND_MINOR:
3172                   return CompactionState.MAJOR_AND_MINOR;
3173                 case MAJOR:
3174                   if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MINOR) {
3175                     return CompactionState.MAJOR_AND_MINOR;
3176                   }
3177                   state = AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR;
3178                   break;
3179                 case MINOR:
3180                   if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) {
3181                     return CompactionState.MAJOR_AND_MINOR;
3182                   }
3183                   state = AdminProtos.GetRegionInfoResponse.CompactionState.MINOR;
3184                   break;
3185                 case NONE:
3186                 default: // nothing, continue
3187               }
3188             } catch (NotServingRegionException e) {
3189               if (LOG.isDebugEnabled()) {
3190                 LOG.debug("Trying to get compaction state of " +
3191                         pair.getFirst() + ": " +
3192                         StringUtils.stringifyException(e));
3193               }
3194             } catch (RemoteException e) {
3195               if (e.getMessage().indexOf(NotServingRegionException.class.getName()) >= 0) {
3196                 if (LOG.isDebugEnabled()) {
3197                   LOG.debug("Trying to get compaction state of " + pair.getFirst() + ": "
3198                           + StringUtils.stringifyException(e));
3199                 }
3200               } else {
3201                 throw e;
3202               }
3203             }
3204           }
3205         } catch (ServiceException se) {
3206           throw ProtobufUtil.getRemoteException(se);
3207         } finally {
3208           if (zookeeper != null) {
3209             zookeeper.close();
3210           }
3211         }
3212         break;
3213     }
3214     if(state != null) {
3215       return ProtobufUtil.createCompactionState(state);
3216     }
3217     return null;
3218   }
3219
3220   /**
3221    * Future that waits on a procedure result.
3222    * Returned by the async version of the Admin calls,
3223    * and used internally by the sync calls to wait on the result of the procedure.
3224    */
3225   @InterfaceAudience.Private
3226   @InterfaceStability.Evolving
3227   protected static class ProcedureFuture<V> implements Future<V> {
3228     private ExecutionException exception = null;
3229     private boolean procResultFound = false;
3230     private boolean done = false;
3231     private boolean cancelled = false;
3232     private V result = null;
3233
3234     private final HBaseAdmin admin;
3235     private final Long procId;
3236
3237     public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
3238       this.admin = admin;
3239       this.procId = procId;
3240     }
3241
3242     @Override
3243     public boolean cancel(boolean mayInterruptIfRunning) {
3244       AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder()
3245           .setProcId(procId).setMayInterruptIfRunning(mayInterruptIfRunning).build();
3246       try {
3247         cancelled = abortProcedureResult(abortProcRequest).getIsProcedureAborted();
3248         if (cancelled) {
3249           done = true;
3250         }
3251       } catch (IOException e) {
3252         // Cancell thrown exception for some reason. At this time, we are not sure whether
3253         // the cancell succeeds or fails. We assume that it is failed, but print out a warning
3254         // for debugging purpose.
3255         LOG.warn(
3256           "Cancelling the procedure with procId=" + procId + " throws exception " + e.getMessage(),
3257           e);
3258         cancelled = false;
3259       }
3260       return cancelled;
3261     }
3262
3263     @Override
3264     public boolean isCancelled() {
3265       return cancelled;
3266     }
3267
3268     protected AbortProcedureResponse abortProcedureResult(
3269         final AbortProcedureRequest request) throws IOException {
3270       return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
3271           admin.getConnection()) {
3272         @Override
3273         public AbortProcedureResponse call(int callTimeout) throws ServiceException {
3274           PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController();
3275           controller.setCallTimeout(callTimeout);
3276           return master.abortProcedure(controller, request);
3277         }
3278       });
3279     }
3280
3281     @Override
3282     public V get() throws InterruptedException, ExecutionException {
3283       // TODO: should we ever spin forever?
3284       throw new UnsupportedOperationException();
3285     }
3286
3287     @Override
3288     public V get(long timeout, TimeUnit unit)
3289         throws InterruptedException, ExecutionException, TimeoutException {
3290       if (!done) {
3291         long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
3292         try {
3293           try {
3294             // if the master support procedures, try to wait the result
3295             if (procId != null) {
3296               result = waitProcedureResult(procId, deadlineTs);
3297             }
3298             // if we don't have a proc result, try the compatibility wait
3299             if (!procResultFound) {
3300               result = waitOperationResult(deadlineTs);
3301             }
3302             result = postOperationResult(result, deadlineTs);
3303             done = true;
3304           } catch (IOException e) {
3305             result = postOperationFailure(e, deadlineTs);
3306             done = true;
3307           }
3308         } catch (IOException e) {
3309           exception = new ExecutionException(e);
3310           done = true;
3311         }
3312       }
3313       if (exception != null) {
3314         throw exception;
3315       }
3316       return result;
3317     }
3318
3319     @Override
3320     public boolean isDone() {
3321       return done;
3322     }
3323
3324     protected HBaseAdmin getAdmin() {
3325       return admin;
3326     }
3327
3328     private V waitProcedureResult(long procId, long deadlineTs)
3329         throws IOException, TimeoutException, InterruptedException {
3330       GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
3331           .setProcId(procId)
3332           .build();
3333
3334       int tries = 0;
3335       IOException serviceEx = null;
3336       while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3337         GetProcedureResultResponse response = null;
3338         try {
3339           // Try to fetch the result
3340           response = getProcedureResult(request);
3341         } catch (IOException e) {
3342           serviceEx = unwrapException(e);
3343
3344           // the master may be down
3345           LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
3346
3347           // Not much to do, if we have a DoNotRetryIOException
3348           if (serviceEx instanceof DoNotRetryIOException) {
3349             // TODO: looks like there is no way to unwrap this exception and get the proper
3350             // UnsupportedOperationException aside from looking at the message.
3351             // anyway, if we fail here we just failover to the compatibility side
3352             // and that is always a valid solution.
3353             LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx);
3354             procResultFound = false;
3355             return null;
3356           }
3357         }
3358
3359         // If the procedure is no longer running, we should have a result
3360         if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) {
3361           procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
3362           return convertResult(response);
3363         }
3364
3365         try {
3366           Thread.sleep(getAdmin().getPauseTime(tries++));
3367         } catch (InterruptedException e) {
3368           throw new InterruptedException(
3369             "Interrupted while waiting for the result of proc " + procId);
3370         }
3371       }
3372       if (serviceEx != null) {
3373         throw serviceEx;
3374       } else {
3375         throw new TimeoutException("The procedure " + procId + " is still running");
3376       }
3377     }
3378
3379     private static IOException unwrapException(IOException e) {
3380       if (e instanceof RemoteException) {
3381         return ((RemoteException)e).unwrapRemoteException();
3382       }
3383       return e;
3384     }
3385
3386     protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
3387         throws IOException {
3388       return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
3389           admin.getConnection()) {
3390         @Override
3391         public GetProcedureResultResponse call(int callTimeout) throws ServiceException {
3392           return master.getProcedureResult(null, request);
3393         }
3394       });
3395     }
3396
3397     /**
3398      * Convert the procedure result response to a specified type.
3399      * @param response the procedure result object to parse
3400      * @return the result data of the procedure.
3401      */
3402     protected V convertResult(final GetProcedureResultResponse response) throws IOException {
3403       if (response.hasException()) {
3404         throw ForeignExceptionUtil.toIOException(response.getException());
3405       }
3406       return null;
3407     }
3408
3409     /**
3410      * Fallback implementation in case the procedure is not supported by the server.
3411      * It should try to wait until the operation is completed.
3412      * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3413      * @return the result data of the operation
3414      */
3415     protected V waitOperationResult(final long deadlineTs)
3416         throws IOException, TimeoutException {
3417       return null;
3418     }
3419
3420     /**
3421      * Called after the operation is completed and the result fetched. this allows to perform extra
3422      * steps after the procedure is completed. it allows to apply transformations to the result that
3423      * will be returned by get().
3424      * @param result the result of the procedure
3425      * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3426      * @return the result of the procedure, which may be the same as the passed one
3427      */
3428     protected V postOperationResult(final V result, final long deadlineTs)
3429         throws IOException, TimeoutException {
3430       return result;
3431     }
3432
3433     /**
3434      * Called after the operation is terminated with a failure.
3435      * this allows to perform extra steps after the procedure is terminated.
3436      * it allows to apply transformations to the result that will be returned by get().
3437      * The default implementation will rethrow the exception
3438      * @param exception the exception got from fetching the result
3439      * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3440      * @return the result of the procedure, which may be the same as the passed one
3441      */
3442     protected V postOperationFailure(final IOException exception, final long deadlineTs)
3443         throws IOException, TimeoutException {
3444       throw exception;
3445     }
3446
3447     protected interface WaitForStateCallable {
3448       boolean checkState(int tries) throws IOException;
3449       void throwInterruptedException() throws InterruptedIOException;
3450       void throwTimeoutException(long elapsed) throws TimeoutException;
3451     }
3452
3453     protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
3454         throws IOException, TimeoutException {
3455       int tries = 0;
3456       IOException serverEx = null;
3457       long startTime = EnvironmentEdgeManager.currentTime();
3458       while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3459         serverEx = null;
3460         try {
3461           if (callable.checkState(tries)) {
3462             return;
3463           }
3464         } catch (IOException e) {
3465           serverEx = e;
3466         }
3467         try {
3468           Thread.sleep(getAdmin().getPauseTime(tries++));
3469         } catch (InterruptedException e) {
3470           callable.throwInterruptedException();
3471         }
3472       }
3473       if (serverEx != null) {
3474         throw unwrapException(serverEx);
3475       } else {
3476         callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
3477       }
3478     }
3479   }
3480
3481   @InterfaceAudience.Private
3482   @InterfaceStability.Evolving
3483   protected static abstract class TableFuture<V> extends ProcedureFuture<V> {
3484     private final TableName tableName;
3485
3486     public TableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
3487       super(admin, procId);
3488       this.tableName = tableName;
3489     }
3490
3491     @Override
3492     public String toString() {
3493       return getDescription();
3494     }
3495
3496     /**
3497      * @return the table name
3498      */
3499     protected TableName getTableName() {
3500       return tableName;
3501     }
3502
3503     /**
3504      * @return the table descriptor
3505      */
3506     protected HTableDescriptor getTableDescriptor() throws IOException {
3507       return getAdmin().getTableDescriptorByTableName(getTableName());
3508     }
3509
3510     /**
3511      * @return the operation type like CREATE, DELETE, DISABLE etc.
3512      */
3513     public abstract String getOperationType();
3514
3515     /**
3516      * @return a description of the operation
3517      */
3518     protected String getDescription() {
3519       return "Operation: " + getOperationType() + ", "
3520           + "Table Name: " + tableName.getNameWithNamespaceInclAsString();
3521
3522     };
3523
3524     protected abstract class TableWaitForStateCallable implements WaitForStateCallable {
3525       @Override
3526       public void throwInterruptedException() throws InterruptedIOException {
3527         throw new InterruptedIOException("Interrupted while waiting for operation: "
3528             + getOperationType() + " on table: " + tableName.getNameWithNamespaceInclAsString());
3529       }
3530
3531       @Override
3532       public void throwTimeoutException(long elapsedTime) throws TimeoutException {
3533         throw new TimeoutException("The operation: " + getOperationType() + " on table: " +
3534             tableName.getNameAsString() + " has not completed after " + elapsedTime + "ms");
3535       }
3536     }
3537
3538     @Override
3539     protected V postOperationResult(final V result, final long deadlineTs)
3540         throws IOException, TimeoutException {
3541       LOG.info(getDescription() + " completed");
3542       return super.postOperationResult(result, deadlineTs);
3543     }
3544
3545     @Override
3546     protected V postOperationFailure(final IOException exception, final long deadlineTs)
3547         throws IOException, TimeoutException {
3548       LOG.info(getDescription() + " failed with " + exception.getMessage());
3549       return super.postOperationFailure(exception, deadlineTs);
3550     }
3551
3552     protected void waitForTableEnabled(final long deadlineTs)
3553         throws IOException, TimeoutException {
3554       waitForState(deadlineTs, new TableWaitForStateCallable() {
3555         @Override
3556         public boolean checkState(int tries) throws IOException {
3557           try {
3558             if (getAdmin().isTableAvailable(tableName)) {
3559               return true;
3560             }
3561           } catch (TableNotFoundException tnfe) {
3562             LOG.debug("Table " + tableName.getNameWithNamespaceInclAsString()
3563                 + " was not enabled, sleeping. tries=" + tries);
3564           }
3565           return false;
3566         }
3567       });
3568     }
3569
3570     protected void waitForTableDisabled(final long deadlineTs)
3571         throws IOException, TimeoutException {
3572       waitForState(deadlineTs, new TableWaitForStateCallable() {
3573         @Override
3574         public boolean checkState(int tries) throws IOException {
3575           return getAdmin().isTableDisabled(tableName);
3576         }
3577       });
3578     }
3579
3580     protected void waitTableNotFound(final long deadlineTs)
3581         throws IOException, TimeoutException {
3582       waitForState(deadlineTs, new TableWaitForStateCallable() {
3583         @Override
3584         public boolean checkState(int tries) throws IOException {
3585           return !getAdmin().tableExists(tableName);
3586         }
3587       });
3588     }
3589
3590     protected void waitForSchemaUpdate(final long deadlineTs)
3591         throws IOException, TimeoutException {
3592       waitForState(deadlineTs, new TableWaitForStateCallable() {
3593         @Override
3594         public boolean checkState(int tries) throws IOException {
3595           return getAdmin().getAlterStatus(tableName).getFirst() == 0;
3596         }
3597       });
3598     }
3599
3600     protected void waitForAllRegionsOnline(final long deadlineTs, final byte[][] splitKeys)
3601         throws IOException, TimeoutException {
3602       final HTableDescriptor desc = getTableDescriptor();
3603       final AtomicInteger actualRegCount = new AtomicInteger(0);
3604       final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3605         @Override
3606         public boolean visit(Result rowResult) throws IOException {
3607           RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
3608           if (list == null) {
3609             LOG.warn("No serialized HRegionInfo in " + rowResult);
3610             return true;
3611           }
3612           HRegionLocation l = list.getRegionLocation();
3613           if (l == null) {
3614             return true;
3615           }
3616           if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
3617             return false;
3618           }
3619           if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
3620           HRegionLocation[] locations = list.getRegionLocations();
3621           for (HRegionLocation location : locations) {
3622             if (location == null) continue;
3623             ServerName serverName = location.getServerName();
3624             // Make sure that regions are assigned to server
3625             if (serverName != null && serverName.getHostAndPort() != null) {
3626               actualRegCount.incrementAndGet();
3627             }
3628           }
3629           return true;
3630         }
3631       };
3632
3633       int tries = 0;
3634       int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
3635       while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3636         actualRegCount.set(0);
3637         MetaTableAccessor.scanMetaForTableRegions(getAdmin().getConnection(), visitor,
3638           desc.getTableName());
3639         if (actualRegCount.get() == numRegs) {
3640           // all the regions are online
3641           return;
3642         }
3643
3644         try {
3645           Thread.sleep(getAdmin().getPauseTime(tries++));
3646         } catch (InterruptedException e) {
3647           throw new InterruptedIOException("Interrupted when opening" + " regions; "
3648               + actualRegCount.get() + " of " + numRegs + " regions processed so far");
3649         }
3650       }
3651       throw new TimeoutException("Only " + actualRegCount.get() + " of " + numRegs
3652           + " regions are online; retries exhausted.");
3653     }
3654   }
3655
3656   @InterfaceAudience.Private
3657   @InterfaceStability.Evolving
3658   protected static abstract class NamespaceFuture extends ProcedureFuture<Void> {
3659     private final String namespaceName;
3660
3661     public NamespaceFuture(final HBaseAdmin admin, final String namespaceName, final Long procId) {
3662       super(admin, procId);
3663       this.namespaceName = namespaceName;
3664     }
3665
3666     /**
3667      * @return the namespace name
3668      */
3669     protected String getNamespaceName() {
3670       return namespaceName;
3671     }
3672
3673     /**
3674      * @return the operation type like CREATE_NAMESPACE, DELETE_NAMESPACE, etc.
3675      */
3676     public abstract String getOperationType();
3677
3678     @Override
3679     public String toString() {
3680       return "Operation: " + getOperationType() + ", Namespace: " + getNamespaceName();
3681     }
3682   }
3683
3684   @Override
3685   public List<SecurityCapability> getSecurityCapabilities() throws IOException {
3686     try {
3687       return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection()) {
3688         @Override
3689         public List<SecurityCapability> call(int callTimeout) throws ServiceException {
3690           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
3691           controller.setCallTimeout(callTimeout);
3692           SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build();
3693           return ProtobufUtil.toSecurityCapabilityList(
3694             master.getSecurityCapabilities(controller, req).getCapabilitiesList());
3695         }
3696       });
3697     } catch (IOException e) {
3698       if (e instanceof RemoteException) {
3699         e = ((RemoteException)e).unwrapRemoteException();
3700       }
3701       throw e;
3702     }
3703   }
3704
3705   @Override
3706   public boolean[] setSplitOrMergeEnabled(final boolean enabled, final boolean synchronous,
3707     final boolean skipLock, final MasterSwitchType... switchTypes) throws IOException {
3708     return executeCallable(new MasterCallable<boolean[]>(getConnection()) {
3709       @Override
3710       public boolean[] call(int callTimeout) throws ServiceException {
3711         MasterProtos.SetSplitOrMergeEnabledResponse response = master.setSplitOrMergeEnabled(null,
3712           RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous,
3713             skipLock, switchTypes));
3714         boolean[] result = new boolean[switchTypes.length];
3715         int i = 0;
3716         for (Boolean prevValue : response.getPrevValueList()) {
3717           result[i++] = prevValue;
3718         }
3719         return result;
3720       }
3721     });
3722   }
3723
3724   @Override
3725   public boolean isSplitOrMergeEnabled(final MasterSwitchType switchType) throws IOException {
3726     return executeCallable(new MasterCallable<Boolean>(getConnection()) {
3727       @Override
3728       public Boolean call(int callTimeout) throws ServiceException {
3729         return master.isSplitOrMergeEnabled(null,
3730           RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType)).getEnabled();
3731       }
3732     });
3733   }
3734
3735   @Override
3736   public void releaseSplitOrMergeLockAndRollback() throws IOException {
3737     executeCallable(new MasterCallable<Void>(getConnection()) {
3738       @Override
3739       public Void call(int callTimeout) throws ServiceException {
3740         master.releaseSplitOrMergeLockAndRollback(null,
3741           RequestConverter.buildReleaseSplitOrMergeLockAndRollbackRequest());
3742         return null;
3743       }
3744     });
3745   }
3746
3747   private HRegionInfo getMobRegionInfo(TableName tableName) {
3748     return new HRegionInfo(tableName, Bytes.toBytes(".mob"),
3749             HConstants.EMPTY_END_ROW, false, 0);
3750   }
3751
3752   private RpcControllerFactory getRpcControllerFactory() {
3753     return rpcControllerFactory;
3754   }
3755 }