001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import com.google.protobuf.Descriptors;
021import com.google.protobuf.Message;
022import com.google.protobuf.RpcController;
023
024import java.io.Closeable;
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.concurrent.Callable;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.Future;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.TimeoutException;
042import java.util.concurrent.atomic.AtomicInteger;
043import java.util.concurrent.atomic.AtomicReference;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.Abortable;
051import org.apache.hadoop.hbase.CacheEvictionStats;
052import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
053import org.apache.hadoop.hbase.ClusterMetrics;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.ClusterMetricsBuilder;
056import org.apache.hadoop.hbase.DoNotRetryIOException;
057import org.apache.hadoop.hbase.HBaseConfiguration;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HRegionInfo;
060import org.apache.hadoop.hbase.HRegionLocation;
061import org.apache.hadoop.hbase.HTableDescriptor;
062import org.apache.hadoop.hbase.MasterNotRunningException;
063import org.apache.hadoop.hbase.MetaTableAccessor;
064import org.apache.hadoop.hbase.NamespaceDescriptor;
065import org.apache.hadoop.hbase.NamespaceNotFoundException;
066import org.apache.hadoop.hbase.NotServingRegionException;
067import org.apache.hadoop.hbase.RegionLocations;
068import org.apache.hadoop.hbase.RegionMetrics;
069import org.apache.hadoop.hbase.RegionMetricsBuilder;
070import org.apache.hadoop.hbase.ServerName;
071import org.apache.hadoop.hbase.TableExistsException;
072import org.apache.hadoop.hbase.TableName;
073import org.apache.hadoop.hbase.TableNotDisabledException;
074import org.apache.hadoop.hbase.TableNotFoundException;
075import org.apache.hadoop.hbase.UnknownRegionException;
076import org.apache.hadoop.hbase.ZooKeeperConnectionException;
077import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
078import org.apache.hadoop.hbase.client.replication.TableCFs;
079import org.apache.hadoop.hbase.client.security.SecurityCapability;
080import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
081import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
082import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
083import org.apache.hadoop.hbase.ipc.HBaseRpcController;
084import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
085import org.apache.hadoop.hbase.quotas.QuotaFilter;
086import org.apache.hadoop.hbase.quotas.QuotaRetriever;
087import org.apache.hadoop.hbase.quotas.QuotaSettings;
088import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
089import org.apache.hadoop.hbase.replication.ReplicationException;
090import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
091import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
096import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
097import org.apache.hadoop.hbase.util.Addressing;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.ipc.RemoteException;
103import org.apache.hadoop.util.StringUtils;
104import org.apache.yetus.audience.InterfaceAudience;
105import org.apache.yetus.audience.InterfaceStability;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
110import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
111import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
112import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
214
215/**
216 * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
217 * this is an HBase-internal class as defined in
218 * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html
219 * There are no guarantees for backwards source / binary compatibility and methods or class can
220 * change or go away without deprecation.
221 * Use {@link Connection#getAdmin()} to obtain an instance of {@link Admin} instead of constructing
222 * an HBaseAdmin directly.
223 *
224 * <p>Connection should be an <i>unmanaged</i> connection obtained via
225 * {@link ConnectionFactory#createConnection(Configuration)}
226 *
227 * @see ConnectionFactory
228 * @see Connection
229 * @see Admin
230 */
231@InterfaceAudience.Private
232public class HBaseAdmin implements Admin {
233  private static final Logger LOG = LoggerFactory.getLogger(HBaseAdmin.class);
234
235  private ClusterConnection connection;
236
237  private final Configuration conf;
238  private final long pause;
239  private final int numRetries;
240  private final int syncWaitTimeout;
241  private boolean aborted;
242  private int operationTimeout;
243  private int rpcTimeout;
244  private int getProcedureTimeout;
245
246  private RpcRetryingCallerFactory rpcCallerFactory;
247  private RpcControllerFactory rpcControllerFactory;
248
249  private NonceGenerator ng;
250
251  @Override
252  public int getOperationTimeout() {
253    return operationTimeout;
254  }
255
256  HBaseAdmin(ClusterConnection connection) throws IOException {
257    this.conf = connection.getConfiguration();
258    this.connection = connection;
259
260    // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time.
261    this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
262        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
263    this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
264        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
265    this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
266        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
267    this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
268        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
269    this.syncWaitTimeout = this.conf.getInt(
270      "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
271    this.getProcedureTimeout =
272        this.conf.getInt("hbase.client.procedure.future.get.timeout.msec", 10 * 60000); // 10min
273
274    this.rpcCallerFactory = connection.getRpcRetryingCallerFactory();
275    this.rpcControllerFactory = connection.getRpcControllerFactory();
276
277    this.ng = this.connection.getNonceGenerator();
278  }
279
280  @Override
281  public void abort(String why, Throwable e) {
282    // Currently does nothing but throw the passed message and exception
283    this.aborted = true;
284    throw new RuntimeException(why, e);
285  }
286
287  @Override
288  public boolean isAborted() {
289    return this.aborted;
290  }
291
292  @Override
293  public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
294  throws IOException {
295    return get(abortProcedureAsync(procId, mayInterruptIfRunning), this.syncWaitTimeout,
296      TimeUnit.MILLISECONDS);
297  }
298
299  @Override
300  public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning)
301      throws IOException {
302    Boolean abortProcResponse =
303        executeCallable(new MasterCallable<AbortProcedureResponse>(getConnection(),
304            getRpcControllerFactory()) {
305      @Override
306      protected AbortProcedureResponse rpcCall() throws Exception {
307        AbortProcedureRequest abortProcRequest =
308            AbortProcedureRequest.newBuilder().setProcId(procId).build();
309        return master.abortProcedure(getRpcController(), abortProcRequest);
310      }
311    }).getIsProcedureAborted();
312    return new AbortProcedureFuture(this, procId, abortProcResponse);
313  }
314
315  @Override
316  public List<TableDescriptor> listTableDescriptors() throws IOException {
317    return listTableDescriptors((Pattern)null, false);
318  }
319
320  @Override
321  public List<TableDescriptor> listTableDescriptors(Pattern pattern) throws IOException {
322    return listTableDescriptors(pattern, false);
323  }
324
325  @Override
326  public List<TableDescriptor> listTableDescriptors(Pattern pattern, boolean includeSysTables)
327      throws IOException {
328    return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(),
329        getRpcControllerFactory()) {
330      @Override
331      protected List<TableDescriptor> rpcCall() throws Exception {
332        GetTableDescriptorsRequest req =
333            RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
334        return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(),
335            req));
336      }
337    });
338  }
339
340  @Override
341  public TableDescriptor getDescriptor(TableName tableName)
342      throws TableNotFoundException, IOException {
343    return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
344       operationTimeout, rpcTimeout);
345  }
346
347  @Override
348  public void modifyTable(TableDescriptor td) throws IOException {
349    get(modifyTableAsync(td), syncWaitTimeout, TimeUnit.MILLISECONDS);
350  }
351
352  @Override
353  public Future<Void> modifyTableAsync(TableDescriptor td) throws IOException {
354    ModifyTableResponse response = executeCallable(
355      new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) {
356        Long nonceGroup = ng.getNonceGroup();
357        Long nonce = ng.newNonce();
358        @Override
359        protected ModifyTableResponse rpcCall() throws Exception {
360          setPriority(td.getTableName());
361          ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
362            td.getTableName(), td, nonceGroup, nonce);
363          return master.modifyTable(getRpcController(), request);
364        }
365      });
366    return new ModifyTableFuture(this, td.getTableName(), response);
367  }
368
369  @Override
370  public List<TableDescriptor> listTableDescriptorsByNamespace(byte[] name) throws IOException {
371    return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(),
372        getRpcControllerFactory()) {
373      @Override
374      protected List<TableDescriptor> rpcCall() throws Exception {
375        return master.listTableDescriptorsByNamespace(getRpcController(),
376                ListTableDescriptorsByNamespaceRequest.newBuilder()
377                  .setNamespaceName(Bytes.toString(name)).build())
378                .getTableSchemaList()
379                .stream()
380                .map(ProtobufUtil::toTableDescriptor)
381                .collect(Collectors.toList());
382      }
383    });
384  }
385
386  @Override
387  public List<TableDescriptor> listTableDescriptors(List<TableName> tableNames) throws IOException {
388    return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(),
389        getRpcControllerFactory()) {
390      @Override
391      protected List<TableDescriptor> rpcCall() throws Exception {
392        GetTableDescriptorsRequest req =
393            RequestConverter.buildGetTableDescriptorsRequest(tableNames);
394          return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(),
395              req));
396      }
397    });
398  }
399
400  @Override
401  public List<RegionInfo> getRegions(final ServerName sn) throws IOException {
402    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
403    // TODO: There is no timeout on this controller. Set one!
404    HBaseRpcController controller = rpcControllerFactory.newController();
405    return ProtobufUtil.getOnlineRegions(controller, admin);
406  }
407
408  @Override
409  public List<RegionInfo> getRegions(TableName tableName) throws IOException {
410    if (TableName.isMetaTableName(tableName)) {
411      return Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO);
412    } else {
413      return MetaTableAccessor.getTableRegions(connection, tableName, true);
414    }
415  }
416
417  private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
418    private boolean isAbortInProgress;
419
420    public AbortProcedureFuture(
421        final HBaseAdmin admin,
422        final Long procId,
423        final Boolean abortProcResponse) {
424      super(admin, procId);
425      this.isAbortInProgress = abortProcResponse;
426    }
427
428    @Override
429    public Boolean get(long timeout, TimeUnit unit)
430        throws InterruptedException, ExecutionException, TimeoutException {
431      if (!this.isAbortInProgress) {
432        return false;
433      }
434      super.get(timeout, unit);
435      return true;
436    }
437  }
438
439  /** @return Connection used by this object. */
440  @Override
441  public Connection getConnection() {
442    return connection;
443  }
444
445  @Override
446  public boolean tableExists(final TableName tableName) throws IOException {
447    return executeCallable(new RpcRetryingCallable<Boolean>() {
448      @Override
449      protected Boolean rpcCall(int callTimeout) throws Exception {
450        return MetaTableAccessor.tableExists(connection, tableName);
451      }
452    });
453  }
454
455  @Override
456  public HTableDescriptor[] listTables() throws IOException {
457    return listTables((Pattern)null, false);
458  }
459
460  @Override
461  public HTableDescriptor[] listTables(Pattern pattern) throws IOException {
462    return listTables(pattern, false);
463  }
464
465  @Override
466  public HTableDescriptor[] listTables(String regex) throws IOException {
467    return listTables(Pattern.compile(regex), false);
468  }
469
470  @Override
471  public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables)
472      throws IOException {
473    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
474        getRpcControllerFactory()) {
475      @Override
476      protected HTableDescriptor[] rpcCall() throws Exception {
477        GetTableDescriptorsRequest req =
478            RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
479        return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(),
480                req)).stream().map(ImmutableHTableDescriptor::new).toArray(HTableDescriptor[]::new);
481      }
482    });
483  }
484
485  @Override
486  public HTableDescriptor[] listTables(String regex, boolean includeSysTables)
487      throws IOException {
488    return listTables(Pattern.compile(regex), includeSysTables);
489  }
490
491  @Override
492  public TableName[] listTableNames() throws IOException {
493    return listTableNames((Pattern)null, false);
494  }
495
496  @Override
497  public TableName[] listTableNames(Pattern pattern) throws IOException {
498    return listTableNames(pattern, false);
499  }
500
501  @Override
502  public TableName[] listTableNames(String regex) throws IOException {
503    return listTableNames(Pattern.compile(regex), false);
504  }
505
506  @Override
507  public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
508      throws IOException {
509    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
510        getRpcControllerFactory()) {
511      @Override
512      protected TableName[] rpcCall() throws Exception {
513        GetTableNamesRequest req =
514            RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
515        return ProtobufUtil.getTableNameArray(master.getTableNames(getRpcController(), req)
516            .getTableNamesList());
517      }
518    });
519  }
520
521  @Override
522  public TableName[] listTableNames(final String regex, final boolean includeSysTables)
523      throws IOException {
524    return listTableNames(Pattern.compile(regex), includeSysTables);
525  }
526
527  @Override
528  public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
529    return getHTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
530       operationTimeout, rpcTimeout);
531  }
532
533  static TableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
534      RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
535      int operationTimeout, int rpcTimeout) throws IOException {
536    if (tableName == null) return null;
537    TableDescriptor td =
538        executeCallable(new MasterCallable<TableDescriptor>(connection, rpcControllerFactory) {
539      @Override
540      protected TableDescriptor rpcCall() throws Exception {
541        GetTableDescriptorsRequest req =
542            RequestConverter.buildGetTableDescriptorsRequest(tableName);
543        GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
544        if (!htds.getTableSchemaList().isEmpty()) {
545          return ProtobufUtil.toTableDescriptor(htds.getTableSchemaList().get(0));
546        }
547        return null;
548      }
549    }, rpcCallerFactory, operationTimeout, rpcTimeout);
550    if (td != null) {
551      return td;
552    }
553    throw new TableNotFoundException(tableName.getNameAsString());
554  }
555
556  /**
557   * @deprecated since 2.0 version and will be removed in 3.0 version.
558   *             use {@link #getTableDescriptor(TableName,
559   *             Connection, RpcRetryingCallerFactory,RpcControllerFactory,int,int)}
560   */
561  @Deprecated
562  static HTableDescriptor getHTableDescriptor(final TableName tableName, Connection connection,
563      RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
564      int operationTimeout, int rpcTimeout) throws IOException {
565    if (tableName == null) {
566      return null;
567    }
568    HTableDescriptor htd =
569        executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) {
570          @Override
571          protected HTableDescriptor rpcCall() throws Exception {
572            GetTableDescriptorsRequest req =
573                RequestConverter.buildGetTableDescriptorsRequest(tableName);
574            GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
575            if (!htds.getTableSchemaList().isEmpty()) {
576              return new ImmutableHTableDescriptor(
577                  ProtobufUtil.toTableDescriptor(htds.getTableSchemaList().get(0)));
578            }
579            return null;
580          }
581        }, rpcCallerFactory, operationTimeout, rpcTimeout);
582    if (htd != null) {
583      return new ImmutableHTableDescriptor(htd);
584    }
585    throw new TableNotFoundException(tableName.getNameAsString());
586  }
587
588  private long getPauseTime(int tries) {
589    int triesCount = tries;
590    if (triesCount >= HConstants.RETRY_BACKOFF.length) {
591      triesCount = HConstants.RETRY_BACKOFF.length - 1;
592    }
593    return this.pause * HConstants.RETRY_BACKOFF[triesCount];
594  }
595
596  @Override
597  public void createTable(TableDescriptor desc)
598  throws IOException {
599    createTable(desc, null);
600  }
601
602  @Override
603  public void createTable(TableDescriptor desc, byte [] startKey,
604      byte [] endKey, int numRegions)
605  throws IOException {
606    if(numRegions < 3) {
607      throw new IllegalArgumentException("Must create at least three regions");
608    } else if(Bytes.compareTo(startKey, endKey) >= 0) {
609      throw new IllegalArgumentException("Start key must be smaller than end key");
610    }
611    if (numRegions == 3) {
612      createTable(desc, new byte[][]{startKey, endKey});
613      return;
614    }
615    byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
616    if(splitKeys == null || splitKeys.length != numRegions - 1) {
617      throw new IllegalArgumentException("Unable to split key range into enough regions");
618    }
619    createTable(desc, splitKeys);
620  }
621
622  @Override
623  public void createTable(final TableDescriptor desc, byte [][] splitKeys)
624      throws IOException {
625    get(createTableAsync(desc, splitKeys), syncWaitTimeout, TimeUnit.MILLISECONDS);
626  }
627
628  @Override
629  public Future<Void> createTableAsync(final TableDescriptor desc, final byte[][] splitKeys)
630      throws IOException {
631    if (desc.getTableName() == null) {
632      throw new IllegalArgumentException("TableName cannot be null");
633    }
634    if (splitKeys != null && splitKeys.length > 0) {
635      Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
636      // Verify there are no duplicate split keys
637      byte[] lastKey = null;
638      for (byte[] splitKey : splitKeys) {
639        if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
640          throw new IllegalArgumentException(
641              "Empty split key must not be passed in the split keys.");
642        }
643        if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
644          throw new IllegalArgumentException("All split keys must be unique, " +
645            "found duplicate: " + Bytes.toStringBinary(splitKey) +
646            ", " + Bytes.toStringBinary(lastKey));
647        }
648        lastKey = splitKey;
649      }
650    }
651
652    CreateTableResponse response = executeCallable(
653      new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) {
654        Long nonceGroup = ng.getNonceGroup();
655        Long nonce = ng.newNonce();
656        @Override
657        protected CreateTableResponse rpcCall() throws Exception {
658          setPriority(desc.getTableName());
659          CreateTableRequest request = RequestConverter.buildCreateTableRequest(
660            desc, splitKeys, nonceGroup, nonce);
661          return master.createTable(getRpcController(), request);
662        }
663      });
664    return new CreateTableFuture(this, desc, splitKeys, response);
665  }
666
667  private static class CreateTableFuture extends TableFuture<Void> {
668    private final TableDescriptor desc;
669    private final byte[][] splitKeys;
670
671    public CreateTableFuture(final HBaseAdmin admin, final TableDescriptor desc,
672        final byte[][] splitKeys, final CreateTableResponse response) {
673      super(admin, desc.getTableName(),
674              (response != null && response.hasProcId()) ? response.getProcId() : null);
675      this.splitKeys = splitKeys;
676      this.desc = desc;
677    }
678
679    @Override
680    protected TableDescriptor getTableDescriptor() {
681      return desc;
682    }
683
684    @Override
685    public String getOperationType() {
686      return "CREATE";
687    }
688
689    @Override
690    protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
691      waitForTableEnabled(deadlineTs);
692      waitForAllRegionsOnline(deadlineTs, splitKeys);
693      return null;
694    }
695  }
696
697  @Override
698  public void deleteTable(final TableName tableName) throws IOException {
699    get(deleteTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
700  }
701
702  @Override
703  public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
704    DeleteTableResponse response = executeCallable(
705      new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) {
706        Long nonceGroup = ng.getNonceGroup();
707        Long nonce = ng.newNonce();
708        @Override
709        protected DeleteTableResponse rpcCall() throws Exception {
710          setPriority(tableName);
711          DeleteTableRequest req =
712              RequestConverter.buildDeleteTableRequest(tableName, nonceGroup,nonce);
713          return master.deleteTable(getRpcController(), req);
714        }
715      });
716    return new DeleteTableFuture(this, tableName, response);
717  }
718
719  private static class DeleteTableFuture extends TableFuture<Void> {
720    public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
721        final DeleteTableResponse response) {
722      super(admin, tableName,
723              (response != null && response.hasProcId()) ? response.getProcId() : null);
724    }
725
726    @Override
727    public String getOperationType() {
728      return "DELETE";
729    }
730
731    @Override
732    protected Void waitOperationResult(final long deadlineTs)
733        throws IOException, TimeoutException {
734      waitTableNotFound(deadlineTs);
735      return null;
736    }
737
738    @Override
739    protected Void postOperationResult(final Void result, final long deadlineTs)
740        throws IOException, TimeoutException {
741      // Delete cached information to prevent clients from using old locations
742      ((ClusterConnection) getAdmin().getConnection()).clearRegionCache(getTableName());
743      return super.postOperationResult(result, deadlineTs);
744    }
745  }
746
747  @Override
748  public HTableDescriptor[] deleteTables(String regex) throws IOException {
749    return deleteTables(Pattern.compile(regex));
750  }
751
752  /**
753   * Delete tables matching the passed in pattern and wait on completion.
754   *
755   * Warning: Use this method carefully, there is no prompting and the effect is
756   * immediate. Consider using {@link #listTables(java.util.regex.Pattern) } and
757   * {@link #deleteTable(TableName)}
758   *
759   * @param pattern The pattern to match table names against
760   * @return Table descriptors for tables that couldn't be deleted
761   * @throws IOException
762   */
763  @Override
764  public HTableDescriptor[] deleteTables(Pattern pattern) throws IOException {
765    List<HTableDescriptor> failed = new LinkedList<>();
766    for (HTableDescriptor table : listTables(pattern)) {
767      try {
768        deleteTable(table.getTableName());
769      } catch (IOException ex) {
770        LOG.info("Failed to delete table " + table.getTableName(), ex);
771        failed.add(table);
772      }
773    }
774    return failed.toArray(new HTableDescriptor[failed.size()]);
775  }
776
777  @Override
778  public void truncateTable(final TableName tableName, final boolean preserveSplits)
779      throws IOException {
780    get(truncateTableAsync(tableName, preserveSplits), syncWaitTimeout, TimeUnit.MILLISECONDS);
781  }
782
783  @Override
784  public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
785      throws IOException {
786    TruncateTableResponse response =
787        executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(),
788            getRpcControllerFactory()) {
789          Long nonceGroup = ng.getNonceGroup();
790          Long nonce = ng.newNonce();
791          @Override
792          protected TruncateTableResponse rpcCall() throws Exception {
793            setPriority(tableName);
794            LOG.info("Started truncating " + tableName);
795            TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
796              tableName, preserveSplits, nonceGroup, nonce);
797            return master.truncateTable(getRpcController(), req);
798          }
799        });
800    return new TruncateTableFuture(this, tableName, preserveSplits, response);
801  }
802
803  private static class TruncateTableFuture extends TableFuture<Void> {
804    private final boolean preserveSplits;
805
806    public TruncateTableFuture(final HBaseAdmin admin, final TableName tableName,
807        final boolean preserveSplits, final TruncateTableResponse response) {
808      super(admin, tableName,
809             (response != null && response.hasProcId()) ? response.getProcId() : null);
810      this.preserveSplits = preserveSplits;
811    }
812
813    @Override
814    public String getOperationType() {
815      return "TRUNCATE";
816    }
817
818    @Override
819    protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
820      waitForTableEnabled(deadlineTs);
821      // once the table is enabled, we know the operation is done. so we can fetch the splitKeys
822      byte[][] splitKeys = preserveSplits ? getAdmin().getTableSplits(getTableName()) : null;
823      waitForAllRegionsOnline(deadlineTs, splitKeys);
824      return null;
825    }
826  }
827
828  private byte[][] getTableSplits(final TableName tableName) throws IOException {
829    byte[][] splits = null;
830    try (RegionLocator locator = getConnection().getRegionLocator(tableName)) {
831      byte[][] startKeys = locator.getStartKeys();
832      if (startKeys.length == 1) {
833        return splits;
834      }
835      splits = new byte[startKeys.length - 1][];
836      for (int i = 1; i < startKeys.length; i++) {
837        splits[i - 1] = startKeys[i];
838      }
839    }
840    return splits;
841  }
842
843  @Override
844  public void enableTable(final TableName tableName)
845  throws IOException {
846    get(enableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
847  }
848
849  @Override
850  public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
851    TableName.isLegalFullyQualifiedTableName(tableName.getName());
852    EnableTableResponse response = executeCallable(
853      new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) {
854        Long nonceGroup = ng.getNonceGroup();
855        Long nonce = ng.newNonce();
856        @Override
857        protected EnableTableResponse rpcCall() throws Exception {
858          setPriority(tableName);
859          LOG.info("Started enable of " + tableName);
860          EnableTableRequest req =
861              RequestConverter.buildEnableTableRequest(tableName, nonceGroup, nonce);
862          return master.enableTable(getRpcController(),req);
863        }
864      });
865    return new EnableTableFuture(this, tableName, response);
866  }
867
868  private static class EnableTableFuture extends TableFuture<Void> {
869    public EnableTableFuture(final HBaseAdmin admin, final TableName tableName,
870        final EnableTableResponse response) {
871      super(admin, tableName,
872              (response != null && response.hasProcId()) ? response.getProcId() : null);
873    }
874
875    @Override
876    public String getOperationType() {
877      return "ENABLE";
878    }
879
880    @Override
881    protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
882      waitForTableEnabled(deadlineTs);
883      return null;
884    }
885  }
886
887  @Override
888  public HTableDescriptor[] enableTables(String regex) throws IOException {
889    return enableTables(Pattern.compile(regex));
890  }
891
892  @Override
893  public HTableDescriptor[] enableTables(Pattern pattern) throws IOException {
894    List<HTableDescriptor> failed = new LinkedList<>();
895    for (HTableDescriptor table : listTables(pattern)) {
896      if (isTableDisabled(table.getTableName())) {
897        try {
898          enableTable(table.getTableName());
899        } catch (IOException ex) {
900          LOG.info("Failed to enable table " + table.getTableName(), ex);
901          failed.add(table);
902        }
903      }
904    }
905    return failed.toArray(new HTableDescriptor[failed.size()]);
906  }
907
908  @Override
909  public void disableTable(final TableName tableName)
910  throws IOException {
911    get(disableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
912  }
913
914  @Override
915  public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
916    TableName.isLegalFullyQualifiedTableName(tableName.getName());
917    DisableTableResponse response = executeCallable(
918      new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) {
919        Long nonceGroup = ng.getNonceGroup();
920        Long nonce = ng.newNonce();
921        @Override
922        protected DisableTableResponse rpcCall() throws Exception {
923          setPriority(tableName);
924          LOG.info("Started disable of " + tableName);
925          DisableTableRequest req =
926              RequestConverter.buildDisableTableRequest(
927                tableName, nonceGroup, nonce);
928          return master.disableTable(getRpcController(), req);
929        }
930      });
931    return new DisableTableFuture(this, tableName, response);
932  }
933
934  private static class DisableTableFuture extends TableFuture<Void> {
935    public DisableTableFuture(final HBaseAdmin admin, final TableName tableName,
936        final DisableTableResponse response) {
937      super(admin, tableName,
938              (response != null && response.hasProcId()) ? response.getProcId() : null);
939    }
940
941    @Override
942    public String getOperationType() {
943      return "DISABLE";
944    }
945
946    @Override
947    protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException {
948      waitForTableDisabled(deadlineTs);
949      return null;
950    }
951  }
952
953  @Override
954  public HTableDescriptor[] disableTables(String regex) throws IOException {
955    return disableTables(Pattern.compile(regex));
956  }
957
958  @Override
959  public HTableDescriptor[] disableTables(Pattern pattern) throws IOException {
960    List<HTableDescriptor> failed = new LinkedList<>();
961    for (HTableDescriptor table : listTables(pattern)) {
962      if (isTableEnabled(table.getTableName())) {
963        try {
964          disableTable(table.getTableName());
965        } catch (IOException ex) {
966          LOG.info("Failed to disable table " + table.getTableName(), ex);
967          failed.add(table);
968        }
969      }
970    }
971    return failed.toArray(new HTableDescriptor[failed.size()]);
972  }
973
974  @Override
975  public boolean isTableEnabled(final TableName tableName) throws IOException {
976    checkTableExists(tableName);
977    return executeCallable(new RpcRetryingCallable<Boolean>() {
978      @Override
979      protected Boolean rpcCall(int callTimeout) throws Exception {
980        TableState tableState = MetaTableAccessor.getTableState(getConnection(), tableName);
981        if (tableState == null) {
982          throw new TableNotFoundException(tableName);
983        }
984        return tableState.inStates(TableState.State.ENABLED);
985      }
986    });
987  }
988
989  @Override
990  public boolean isTableDisabled(TableName tableName) throws IOException {
991    checkTableExists(tableName);
992    return connection.isTableDisabled(tableName);
993  }
994
995  @Override
996  public boolean isTableAvailable(TableName tableName) throws IOException {
997    return connection.isTableAvailable(tableName, null);
998  }
999
1000  @Override
1001  public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
1002    return connection.isTableAvailable(tableName, splitKeys);
1003  }
1004
1005  @Override
1006  public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException {
1007    return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(),
1008        getRpcControllerFactory()) {
1009      @Override
1010      protected Pair<Integer, Integer> rpcCall() throws Exception {
1011        setPriority(tableName);
1012        GetSchemaAlterStatusRequest req = RequestConverter
1013            .buildGetSchemaAlterStatusRequest(tableName);
1014        GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(getRpcController(), req);
1015        Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
1016            ret.getTotalRegions());
1017        return pair;
1018      }
1019    });
1020  }
1021
1022  @Override
1023  public Pair<Integer, Integer> getAlterStatus(final byte[] tableName) throws IOException {
1024    return getAlterStatus(TableName.valueOf(tableName));
1025  }
1026
1027  @Override
1028  public void addColumnFamily(final TableName tableName, final ColumnFamilyDescriptor columnFamily)
1029      throws IOException {
1030    get(addColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
1031  }
1032
1033  @Override
1034  public Future<Void> addColumnFamilyAsync(final TableName tableName,
1035      final ColumnFamilyDescriptor columnFamily) throws IOException {
1036    AddColumnResponse response =
1037        executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
1038            getRpcControllerFactory()) {
1039          Long nonceGroup = ng.getNonceGroup();
1040          Long nonce = ng.newNonce();
1041          @Override
1042          protected AddColumnResponse rpcCall() throws Exception {
1043            setPriority(tableName);
1044            AddColumnRequest req =
1045                RequestConverter.buildAddColumnRequest(tableName, columnFamily, nonceGroup, nonce);
1046            return master.addColumn(getRpcController(), req);
1047          }
1048        });
1049    return new AddColumnFamilyFuture(this, tableName, response);
1050  }
1051
1052  private static class AddColumnFamilyFuture extends ModifyTableFuture {
1053    public AddColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
1054        final AddColumnResponse response) {
1055      super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
1056          : null);
1057    }
1058
1059    @Override
1060    public String getOperationType() {
1061      return "ADD_COLUMN_FAMILY";
1062    }
1063  }
1064
1065  /**
1066   * {@inheritDoc}
1067   * @deprecated Since 2.0. Will be removed in 3.0. Use
1068   *     {@link #deleteColumnFamily(TableName, byte[])} instead.
1069   */
1070  @Override
1071  @Deprecated
1072  public void deleteColumn(final TableName tableName, final byte[] columnFamily)
1073  throws IOException {
1074    deleteColumnFamily(tableName, columnFamily);
1075  }
1076
1077  @Override
1078  public void deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
1079      throws IOException {
1080    get(deleteColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
1081  }
1082
1083  @Override
1084  public Future<Void> deleteColumnFamilyAsync(final TableName tableName, final byte[] columnFamily)
1085      throws IOException {
1086    DeleteColumnResponse response =
1087        executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(),
1088            getRpcControllerFactory()) {
1089          Long nonceGroup = ng.getNonceGroup();
1090          Long nonce = ng.newNonce();
1091          @Override
1092          protected DeleteColumnResponse rpcCall() throws Exception {
1093            setPriority(tableName);
1094            DeleteColumnRequest req =
1095                RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
1096                  nonceGroup, nonce);
1097            return master.deleteColumn(getRpcController(), req);
1098          }
1099        });
1100    return new DeleteColumnFamilyFuture(this, tableName, response);
1101  }
1102
1103  private static class DeleteColumnFamilyFuture extends ModifyTableFuture {
1104    public DeleteColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
1105        final DeleteColumnResponse response) {
1106      super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
1107          : null);
1108    }
1109
1110    @Override
1111    public String getOperationType() {
1112      return "DELETE_COLUMN_FAMILY";
1113    }
1114  }
1115
1116  @Override
1117  public void modifyColumnFamily(final TableName tableName,
1118      final ColumnFamilyDescriptor columnFamily) throws IOException {
1119    get(modifyColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
1120  }
1121
1122  @Override
1123  public Future<Void> modifyColumnFamilyAsync(final TableName tableName,
1124      final ColumnFamilyDescriptor columnFamily) throws IOException {
1125    ModifyColumnResponse response =
1126        executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(),
1127            getRpcControllerFactory()) {
1128          Long nonceGroup = ng.getNonceGroup();
1129          Long nonce = ng.newNonce();
1130          @Override
1131          protected ModifyColumnResponse rpcCall() throws Exception {
1132            setPriority(tableName);
1133            ModifyColumnRequest req =
1134                RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
1135                  nonceGroup, nonce);
1136            return master.modifyColumn(getRpcController(), req);
1137          }
1138        });
1139    return new ModifyColumnFamilyFuture(this, tableName, response);
1140  }
1141
1142  private static class ModifyColumnFamilyFuture extends ModifyTableFuture {
1143    public ModifyColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
1144        final ModifyColumnResponse response) {
1145      super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
1146          : null);
1147    }
1148
1149    @Override
1150    public String getOperationType() {
1151      return "MODIFY_COLUMN_FAMILY";
1152    }
1153  }
1154
1155  @Deprecated
1156  @Override
1157  public void closeRegion(final String regionName, final String unused) throws IOException {
1158    unassign(Bytes.toBytes(regionName), true);
1159  }
1160
1161  @Deprecated
1162  @Override
1163  public void closeRegion(final byte [] regionName, final String unused) throws IOException {
1164    unassign(regionName, true);
1165  }
1166
1167  @Deprecated
1168  @Override
1169  public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
1170      final String unused) throws IOException {
1171    unassign(Bytes.toBytes(encodedRegionName), true);
1172    return true;
1173  }
1174
1175  @Deprecated
1176  @Override
1177  public void closeRegion(final ServerName unused, final HRegionInfo hri) throws IOException {
1178    unassign(hri.getRegionName(), true);
1179  }
1180
1181  /**
1182   * @param sn
1183   * @return List of {@link HRegionInfo}.
1184   * @throws IOException
1185   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
1186   *             Use {@link #getRegions(ServerName)}.
1187   */
1188  @Deprecated
1189  @Override
1190  public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
1191    return getRegions(sn).stream().map(ImmutableHRegionInfo::new).collect(Collectors.toList());
1192  }
1193
1194  @Override
1195  public void flush(final TableName tableName) throws IOException {
1196    checkTableExists(tableName);
1197    if (isTableDisabled(tableName)) {
1198      LOG.info("Table is disabled: " + tableName.getNameAsString());
1199      return;
1200    }
1201    execProcedure("flush-table-proc", tableName.getNameAsString(), new HashMap<>());
1202  }
1203
1204  @Override
1205  public void flushRegion(final byte[] regionName) throws IOException {
1206    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
1207    if (regionServerPair == null) {
1208      throw new IllegalArgumentException("Unknown regionname: " + Bytes.toStringBinary(regionName));
1209    }
1210    if (regionServerPair.getSecond() == null) {
1211      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1212    }
1213    final RegionInfo regionInfo = regionServerPair.getFirst();
1214    ServerName serverName = regionServerPair.getSecond();
1215    flush(this.connection.getAdmin(serverName), regionInfo);
1216  }
1217
1218  private void flush(AdminService.BlockingInterface admin, final RegionInfo info)
1219    throws IOException {
1220    ProtobufUtil.call(() -> {
1221      // TODO: There is no timeout on this controller. Set one!
1222      HBaseRpcController controller = rpcControllerFactory.newController();
1223      FlushRegionRequest request =
1224        RequestConverter.buildFlushRegionRequest(info.getRegionName());
1225      admin.flushRegion(controller, request);
1226      return null;
1227    });
1228  }
1229
1230  @Override
1231  public void flushRegionServer(ServerName serverName) throws IOException {
1232    for (RegionInfo region : getRegions(serverName)) {
1233      flush(this.connection.getAdmin(serverName), region);
1234    }
1235  }
1236
1237  /**
1238   * {@inheritDoc}
1239   */
1240  @Override
1241  public void compact(final TableName tableName)
1242    throws IOException {
1243    compact(tableName, null, false, CompactType.NORMAL);
1244  }
1245
1246  @Override
1247  public void compactRegion(final byte[] regionName)
1248    throws IOException {
1249    compactRegion(regionName, null, false);
1250  }
1251
1252  /**
1253   * {@inheritDoc}
1254   */
1255  @Override
1256  public void compact(final TableName tableName, final byte[] columnFamily)
1257    throws IOException {
1258    compact(tableName, columnFamily, false, CompactType.NORMAL);
1259  }
1260
1261  /**
1262   * {@inheritDoc}
1263   */
1264  @Override
1265  public void compactRegion(final byte[] regionName, final byte[] columnFamily)
1266    throws IOException {
1267    compactRegion(regionName, columnFamily, false);
1268  }
1269
1270  @Override
1271  public void compactRegionServer(final ServerName serverName) throws IOException {
1272    for (RegionInfo region : getRegions(serverName)) {
1273      compact(this.connection.getAdmin(serverName), region, false, null);
1274    }
1275  }
1276
1277  @Override
1278  public void majorCompactRegionServer(final ServerName serverName) throws IOException {
1279    for (RegionInfo region : getRegions(serverName)) {
1280      compact(this.connection.getAdmin(serverName), region, true, null);
1281    }
1282  }
1283
1284  @Override
1285  public void majorCompact(final TableName tableName)
1286  throws IOException {
1287    compact(tableName, null, true, CompactType.NORMAL);
1288  }
1289
1290  @Override
1291  public void majorCompactRegion(final byte[] regionName)
1292  throws IOException {
1293    compactRegion(regionName, null, true);
1294  }
1295
1296  /**
1297   * {@inheritDoc}
1298   */
1299  @Override
1300  public void majorCompact(final TableName tableName, final byte[] columnFamily)
1301  throws IOException {
1302    compact(tableName, columnFamily, true, CompactType.NORMAL);
1303  }
1304
1305  @Override
1306  public void majorCompactRegion(final byte[] regionName, final byte[] columnFamily)
1307  throws IOException {
1308    compactRegion(regionName, columnFamily, true);
1309  }
1310
1311  /**
1312   * Compact a table.
1313   * Asynchronous operation.
1314   *
1315   * @param tableName table or region to compact
1316   * @param columnFamily column family within a table or region
1317   * @param major True if we are to do a major compaction.
1318   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
1319   * @throws IOException if a remote or network exception occurs
1320   */
1321  private void compact(final TableName tableName, final byte[] columnFamily,final boolean major,
1322                       CompactType compactType) throws IOException {
1323    switch (compactType) {
1324      case MOB:
1325        compact(this.connection.getAdminForMaster(), RegionInfo.createMobRegionInfo(tableName),
1326            major, columnFamily);
1327        break;
1328      case NORMAL:
1329        checkTableExists(tableName);
1330        for (HRegionLocation loc :connection.locateRegions(tableName, false, false)) {
1331          ServerName sn = loc.getServerName();
1332          if (sn == null) {
1333            continue;
1334          }
1335          try {
1336            compact(this.connection.getAdmin(sn), loc.getRegion(), major, columnFamily);
1337          } catch (NotServingRegionException e) {
1338            if (LOG.isDebugEnabled()) {
1339              LOG.debug("Trying to" + (major ? " major" : "") + " compact " + loc.getRegion() +
1340                  ": " + StringUtils.stringifyException(e));
1341            }
1342          }
1343        }
1344        break;
1345      default:
1346        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1347    }
1348  }
1349
1350  /**
1351   * Compact an individual region.
1352   * Asynchronous operation.
1353   *
1354   * @param regionName region to compact
1355   * @param columnFamily column family within a table or region
1356   * @param major True if we are to do a major compaction.
1357   * @throws IOException if a remote or network exception occurs
1358   * @throws InterruptedException
1359   */
1360  private void compactRegion(final byte[] regionName, final byte[] columnFamily,
1361      final boolean major) throws IOException {
1362    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
1363    if (regionServerPair == null) {
1364      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1365    }
1366    if (regionServerPair.getSecond() == null) {
1367      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1368    }
1369    compact(this.connection.getAdmin(regionServerPair.getSecond()), regionServerPair.getFirst(),
1370      major, columnFamily);
1371  }
1372
1373  private void compact(AdminService.BlockingInterface admin, RegionInfo hri, boolean major,
1374      byte[] family) throws IOException {
1375    Callable<Void> callable = new Callable<Void>() {
1376      @Override
1377      public Void call() throws Exception {
1378        // TODO: There is no timeout on this controller. Set one!
1379        HBaseRpcController controller = rpcControllerFactory.newController();
1380        CompactRegionRequest request =
1381            RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
1382        admin.compactRegion(controller, request);
1383        return null;
1384      }
1385    };
1386    ProtobufUtil.call(callable);
1387  }
1388
1389  @Override
1390  public void move(final byte[] encodedRegionName, final byte[] destServerName) throws IOException {
1391    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1392      @Override
1393      protected Void rpcCall() throws Exception {
1394        setPriority(encodedRegionName);
1395        MoveRegionRequest request =
1396            RequestConverter.buildMoveRegionRequest(encodedRegionName,
1397              destServerName != null ? ServerName.valueOf(Bytes.toString(destServerName)) : null);
1398        master.moveRegion(getRpcController(), request);
1399        return null;
1400      }
1401    });
1402  }
1403
1404  @Override
1405  public void assign(final byte [] regionName) throws MasterNotRunningException,
1406      ZooKeeperConnectionException, IOException {
1407    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1408      @Override
1409      protected Void rpcCall() throws Exception {
1410        setPriority(regionName);
1411        AssignRegionRequest request =
1412            RequestConverter.buildAssignRegionRequest(getRegionName(regionName));
1413        master.assignRegion(getRpcController(), request);
1414        return null;
1415      }
1416    });
1417  }
1418
1419  @Override
1420  public void unassign(final byte [] regionName, final boolean force) throws IOException {
1421    final byte[] toBeUnassigned = getRegionName(regionName);
1422    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1423      @Override
1424      protected Void rpcCall() throws Exception {
1425        setPriority(regionName);
1426        UnassignRegionRequest request =
1427            RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
1428        master.unassignRegion(getRpcController(), request);
1429        return null;
1430      }
1431    });
1432  }
1433
1434  @Override
1435  public void offline(final byte [] regionName)
1436  throws IOException {
1437    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1438      @Override
1439      protected Void rpcCall() throws Exception {
1440        setPriority(regionName);
1441        master.offlineRegion(getRpcController(),
1442            RequestConverter.buildOfflineRegionRequest(regionName));
1443        return null;
1444      }
1445    });
1446  }
1447
1448  @Override
1449  public boolean balancerSwitch(final boolean on, final boolean synchronous)
1450  throws IOException {
1451    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1452      @Override
1453      protected Boolean rpcCall() throws Exception {
1454        SetBalancerRunningRequest req =
1455            RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
1456        return master.setBalancerRunning(getRpcController(), req).getPrevBalanceValue();
1457      }
1458    });
1459  }
1460
1461  @Override
1462  public boolean balance() throws IOException {
1463    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1464      @Override
1465      protected Boolean rpcCall() throws Exception {
1466        return master.balance(getRpcController(),
1467            RequestConverter.buildBalanceRequest(false)).getBalancerRan();
1468      }
1469    });
1470  }
1471
1472  @Override
1473  public boolean balance(final boolean force) throws IOException {
1474    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1475      @Override
1476      protected Boolean rpcCall() throws Exception {
1477        return master.balance(getRpcController(),
1478            RequestConverter.buildBalanceRequest(force)).getBalancerRan();
1479      }
1480    });
1481  }
1482
1483  @Override
1484  public boolean isBalancerEnabled() throws IOException {
1485    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1486      @Override
1487      protected Boolean rpcCall() throws Exception {
1488        return master.isBalancerEnabled(getRpcController(),
1489          RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
1490      }
1491    });
1492  }
1493
1494  /**
1495   * {@inheritDoc}
1496   */
1497  @Override
1498  public CacheEvictionStats clearBlockCache(final TableName tableName) throws IOException {
1499    checkTableExists(tableName);
1500    CacheEvictionStatsBuilder cacheEvictionStats = CacheEvictionStats.builder();
1501    List<Pair<RegionInfo, ServerName>> pairs =
1502      MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
1503    Map<ServerName, List<RegionInfo>> regionInfoByServerName =
1504        pairs.stream()
1505            .filter(pair -> !(pair.getFirst().isOffline()))
1506            .filter(pair -> pair.getSecond() != null)
1507            .collect(Collectors.groupingBy(pair -> pair.getSecond(),
1508                Collectors.mapping(pair -> pair.getFirst(), Collectors.toList())));
1509
1510    for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
1511      CacheEvictionStats stats = clearBlockCache(entry.getKey(), entry.getValue());
1512      cacheEvictionStats = cacheEvictionStats.append(stats);
1513      if (stats.getExceptionCount() > 0) {
1514        for (Map.Entry<byte[], Throwable> exception : stats.getExceptions().entrySet()) {
1515          LOG.debug("Failed to clear block cache for "
1516              + Bytes.toStringBinary(exception.getKey())
1517              + " on " + entry.getKey() + ": ", exception.getValue());
1518        }
1519      }
1520    }
1521    return cacheEvictionStats.build();
1522  }
1523
1524  private CacheEvictionStats clearBlockCache(final ServerName sn, final List<RegionInfo> hris)
1525      throws IOException {
1526    HBaseRpcController controller = rpcControllerFactory.newController();
1527    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1528    ClearRegionBlockCacheRequest request =
1529      RequestConverter.buildClearRegionBlockCacheRequest(hris);
1530    ClearRegionBlockCacheResponse response;
1531    try {
1532      response = admin.clearRegionBlockCache(controller, request);
1533      return ProtobufUtil.toCacheEvictionStats(response.getStats());
1534    } catch (ServiceException se) {
1535      throw ProtobufUtil.getRemoteException(se);
1536    }
1537  }
1538
1539  /**
1540   * Invoke region normalizer. Can NOT run for various reasons.  Check logs.
1541   *
1542   * @return True if region normalizer ran, false otherwise.
1543   */
1544  @Override
1545  public boolean normalize() throws IOException {
1546    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1547      @Override
1548      protected Boolean rpcCall() throws Exception {
1549        return master.normalize(getRpcController(),
1550            RequestConverter.buildNormalizeRequest()).getNormalizerRan();
1551      }
1552    });
1553  }
1554
1555  @Override
1556  public boolean isNormalizerEnabled() throws IOException {
1557    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1558      @Override
1559      protected Boolean rpcCall() throws Exception {
1560        return master.isNormalizerEnabled(getRpcController(),
1561          RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
1562      }
1563    });
1564  }
1565
1566  @Override
1567  public boolean normalizerSwitch(final boolean on) throws IOException {
1568    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1569      @Override
1570      protected Boolean rpcCall() throws Exception {
1571        SetNormalizerRunningRequest req =
1572          RequestConverter.buildSetNormalizerRunningRequest(on);
1573        return master.setNormalizerRunning(getRpcController(), req).getPrevNormalizerValue();
1574      }
1575    });
1576  }
1577
1578  @Override
1579  public boolean catalogJanitorSwitch(final boolean enable) throws IOException {
1580    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1581      @Override
1582      protected Boolean rpcCall() throws Exception {
1583        return master.enableCatalogJanitor(getRpcController(),
1584          RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
1585      }
1586    });
1587  }
1588
1589  @Override
1590  public int runCatalogJanitor() throws IOException {
1591    return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) {
1592      @Override
1593      protected Integer rpcCall() throws Exception {
1594        return master.runCatalogScan(getRpcController(),
1595          RequestConverter.buildCatalogScanRequest()).getScanResult();
1596      }
1597    });
1598  }
1599
1600  @Override
1601  public boolean isCatalogJanitorEnabled() throws IOException {
1602    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1603      @Override
1604      protected Boolean rpcCall() throws Exception {
1605        return master.isCatalogJanitorEnabled(getRpcController(),
1606          RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
1607      }
1608    });
1609  }
1610
1611  @Override
1612  public boolean cleanerChoreSwitch(final boolean on) throws IOException {
1613    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1614      @Override public Boolean rpcCall() throws Exception {
1615        return master.setCleanerChoreRunning(getRpcController(),
1616            RequestConverter.buildSetCleanerChoreRunningRequest(on)).getPrevValue();
1617      }
1618    });
1619  }
1620
1621  @Override
1622  public boolean runCleanerChore() throws IOException {
1623    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1624      @Override public Boolean rpcCall() throws Exception {
1625        return master.runCleanerChore(getRpcController(),
1626            RequestConverter.buildRunCleanerChoreRequest()).getCleanerChoreRan();
1627      }
1628    });
1629  }
1630
1631  @Override
1632  public boolean isCleanerChoreEnabled() throws IOException {
1633    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1634      @Override public Boolean rpcCall() throws Exception {
1635        return master.isCleanerChoreEnabled(getRpcController(),
1636            RequestConverter.buildIsCleanerChoreEnabledRequest()).getValue();
1637      }
1638    });
1639  }
1640
1641  /**
1642   * Merge two regions. Synchronous operation.
1643   * Note: It is not feasible to predict the length of merge.
1644   *   Therefore, this is for internal testing only.
1645   * @param nameOfRegionA encoded or full name of region a
1646   * @param nameOfRegionB encoded or full name of region b
1647   * @param forcible true if do a compulsory merge, otherwise we will only merge
1648   *          two adjacent regions
1649   * @throws IOException
1650   */
1651  @VisibleForTesting
1652  public void mergeRegionsSync(
1653      final byte[] nameOfRegionA,
1654      final byte[] nameOfRegionB,
1655      final boolean forcible) throws IOException {
1656    get(
1657      mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible),
1658      syncWaitTimeout,
1659      TimeUnit.MILLISECONDS);
1660  }
1661
1662  /**
1663   * Merge two regions. Asynchronous operation.
1664   * @param nameOfRegionA encoded or full name of region a
1665   * @param nameOfRegionB encoded or full name of region b
1666   * @param forcible true if do a compulsory merge, otherwise we will only merge
1667   *          two adjacent regions
1668   * @throws IOException
1669   * @deprecated Since 2.0. Will be removed in 3.0. Use
1670   *     {@link #mergeRegionsAsync(byte[], byte[], boolean)} instead.
1671   */
1672  @Deprecated
1673  @Override
1674  public void mergeRegions(final byte[] nameOfRegionA,
1675      final byte[] nameOfRegionB, final boolean forcible)
1676      throws IOException {
1677    mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible);
1678  }
1679
1680  /**
1681   * Merge two regions. Asynchronous operation.
1682   * @param nameOfRegionA encoded or full name of region a
1683   * @param nameOfRegionB encoded or full name of region b
1684   * @param forcible true if do a compulsory merge, otherwise we will only merge
1685   *          two adjacent regions
1686   * @throws IOException
1687   */
1688  @Override
1689  public Future<Void> mergeRegionsAsync(
1690      final byte[] nameOfRegionA,
1691      final byte[] nameOfRegionB,
1692      final boolean forcible) throws IOException {
1693    byte[][] nameofRegionsToMerge = new byte[2][];
1694    nameofRegionsToMerge[0] = nameOfRegionA;
1695    nameofRegionsToMerge[1] = nameOfRegionB;
1696    return mergeRegionsAsync(nameofRegionsToMerge, forcible);
1697  }
1698
1699  /**
1700   * Merge two regions. Asynchronous operation.
1701   * @param nameofRegionsToMerge encoded or full name of daughter regions
1702   * @param forcible true if do a compulsory merge, otherwise we will only merge
1703   *          adjacent regions
1704   * @throws IOException
1705   */
1706  @Override
1707  public Future<Void> mergeRegionsAsync(
1708      final byte[][] nameofRegionsToMerge,
1709      final boolean forcible) throws IOException {
1710    assert(nameofRegionsToMerge.length >= 2);
1711    byte[][] encodedNameofRegionsToMerge = new byte[nameofRegionsToMerge.length][];
1712    for(int i = 0; i < nameofRegionsToMerge.length; i++) {
1713      encodedNameofRegionsToMerge[i] = HRegionInfo.isEncodedRegionName(nameofRegionsToMerge[i]) ?
1714          nameofRegionsToMerge[i] :
1715          Bytes.toBytes(HRegionInfo.encodeRegionName(nameofRegionsToMerge[i]));
1716    }
1717
1718    TableName tableName = null;
1719    Pair<RegionInfo, ServerName> pair;
1720
1721    for(int i = 0; i < nameofRegionsToMerge.length; i++) {
1722      pair = getRegion(nameofRegionsToMerge[i]);
1723
1724      if (pair != null) {
1725        if (pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1726          throw new IllegalArgumentException ("Can't invoke merge on non-default regions directly");
1727        }
1728        if (tableName == null) {
1729          tableName = pair.getFirst().getTable();
1730        } else  if (!tableName.equals(pair.getFirst().getTable())) {
1731          throw new IllegalArgumentException ("Cannot merge regions from two different tables " +
1732              tableName + " and " + pair.getFirst().getTable());
1733        }
1734      } else {
1735        throw new UnknownRegionException (
1736          "Can't invoke merge on unknown region "
1737          + Bytes.toStringBinary(encodedNameofRegionsToMerge[i]));
1738      }
1739    }
1740
1741    MergeTableRegionsResponse response =
1742        executeCallable(new MasterCallable<MergeTableRegionsResponse>(getConnection(),
1743            getRpcControllerFactory()) {
1744          Long nonceGroup = ng.getNonceGroup();
1745          Long nonce = ng.newNonce();
1746      @Override
1747      protected MergeTableRegionsResponse rpcCall() throws Exception {
1748        MergeTableRegionsRequest request = RequestConverter
1749            .buildMergeTableRegionsRequest(
1750                encodedNameofRegionsToMerge,
1751                forcible,
1752                nonceGroup,
1753                nonce);
1754        return master.mergeTableRegions(getRpcController(), request);
1755      }
1756    });
1757    return new MergeTableRegionsFuture(this, tableName, response);
1758  }
1759
1760  private static class MergeTableRegionsFuture extends TableFuture<Void> {
1761    public MergeTableRegionsFuture(
1762        final HBaseAdmin admin,
1763        final TableName tableName,
1764        final MergeTableRegionsResponse response) {
1765      super(admin, tableName,
1766          (response != null && response.hasProcId()) ? response.getProcId() : null);
1767    }
1768
1769    public MergeTableRegionsFuture(
1770        final HBaseAdmin admin,
1771        final TableName tableName,
1772        final Long procId) {
1773      super(admin, tableName, procId);
1774    }
1775
1776    @Override
1777    public String getOperationType() {
1778      return "MERGE_REGIONS";
1779    }
1780  }
1781  /**
1782   * Split one region. Synchronous operation.
1783   * Note: It is not feasible to predict the length of split.
1784   *   Therefore, this is for internal testing only.
1785   * @param regionName encoded or full name of region
1786   * @param splitPoint key where region splits
1787   * @throws IOException
1788   */
1789  @VisibleForTesting
1790  public void splitRegionSync(byte[] regionName, byte[] splitPoint) throws IOException {
1791    splitRegionSync(regionName, splitPoint, syncWaitTimeout, TimeUnit.MILLISECONDS);
1792  }
1793
1794
1795  /**
1796   * Split one region. Synchronous operation.
1797   * @param regionName region to be split
1798   * @param splitPoint split point
1799   * @param timeout how long to wait on split
1800   * @param units time units
1801   * @throws IOException
1802   */
1803  public void splitRegionSync(byte[] regionName, byte[] splitPoint,
1804    final long timeout, final TimeUnit units) throws IOException {
1805    get(
1806        splitRegionAsync(regionName, splitPoint),
1807        timeout,
1808        units);
1809  }
1810
1811  @Override
1812  public Future<Void> splitRegionAsync(byte[] regionName, byte[] splitPoint)
1813      throws IOException {
1814    byte[] encodedNameofRegionToSplit = HRegionInfo.isEncodedRegionName(regionName) ?
1815        regionName : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
1816    Pair<RegionInfo, ServerName> pair = getRegion(regionName);
1817    if (pair != null) {
1818      if (pair.getFirst() != null &&
1819          pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1820        throw new IllegalArgumentException ("Can't invoke split on non-default regions directly");
1821      }
1822    } else {
1823      throw new UnknownRegionException (
1824          "Can't invoke merge on unknown region "
1825              + Bytes.toStringBinary(encodedNameofRegionToSplit));
1826    }
1827
1828    return splitRegionAsync(pair.getFirst(), splitPoint);
1829  }
1830
1831  Future<Void> splitRegionAsync(RegionInfo hri, byte[] splitPoint) throws IOException {
1832    TableName tableName = hri.getTable();
1833    if (hri.getStartKey() != null && splitPoint != null &&
1834        Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
1835      throw new IOException("should not give a splitkey which equals to startkey!");
1836    }
1837
1838    SplitTableRegionResponse response = executeCallable(
1839        new MasterCallable<SplitTableRegionResponse>(getConnection(), getRpcControllerFactory()) {
1840          Long nonceGroup = ng.getNonceGroup();
1841          Long nonce = ng.newNonce();
1842          @Override
1843          protected SplitTableRegionResponse rpcCall() throws Exception {
1844            setPriority(tableName);
1845            SplitTableRegionRequest request = RequestConverter
1846                .buildSplitTableRegionRequest(hri, splitPoint, nonceGroup, nonce);
1847            return master.splitRegion(getRpcController(), request);
1848          }
1849        });
1850    return new SplitTableRegionFuture(this, tableName, response);
1851  }
1852
1853  private static class SplitTableRegionFuture extends TableFuture<Void> {
1854    public SplitTableRegionFuture(final HBaseAdmin admin,
1855        final TableName tableName,
1856        final SplitTableRegionResponse response) {
1857      super(admin, tableName,
1858          (response != null && response.hasProcId()) ? response.getProcId() : null);
1859    }
1860
1861    public SplitTableRegionFuture(
1862        final HBaseAdmin admin,
1863        final TableName tableName,
1864        final Long procId) {
1865      super(admin, tableName, procId);
1866    }
1867
1868    @Override
1869    public String getOperationType() {
1870      return "SPLIT_REGION";
1871    }
1872  }
1873
1874  @Override
1875  public void split(final TableName tableName) throws IOException {
1876    split(tableName, null);
1877  }
1878
1879  @Override
1880  public void splitRegion(final byte[] regionName) throws IOException {
1881    splitRegion(regionName, null);
1882  }
1883
1884  @Override
1885  public void split(final TableName tableName, final byte[] splitPoint) throws IOException {
1886    checkTableExists(tableName);
1887    for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) {
1888      ServerName sn = loc.getServerName();
1889      if (sn == null) {
1890        continue;
1891      }
1892      RegionInfo r = loc.getRegion();
1893      // check for parents
1894      if (r.isSplitParent()) {
1895        continue;
1896      }
1897      // if a split point given, only split that particular region
1898      if (r.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID ||
1899          (splitPoint != null && !r.containsRow(splitPoint))) {
1900        continue;
1901      }
1902      // call out to master to do split now
1903      splitRegionAsync(r, splitPoint);
1904    }
1905  }
1906
1907  @Override
1908  public void splitRegion(final byte[] regionName, final byte [] splitPoint) throws IOException {
1909    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
1910    if (regionServerPair == null) {
1911      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1912    }
1913    if (regionServerPair.getFirst() != null &&
1914        regionServerPair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1915      throw new IllegalArgumentException("Can't split replicas directly. "
1916          + "Replicas are auto-split when their primary is split.");
1917    }
1918    if (regionServerPair.getSecond() == null) {
1919      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1920    }
1921    splitRegionAsync(regionServerPair.getFirst(), splitPoint);
1922  }
1923
1924  @Override
1925  public void modifyTable(final TableName tableName, final TableDescriptor td)
1926      throws IOException {
1927    get(modifyTableAsync(tableName, td), syncWaitTimeout, TimeUnit.MILLISECONDS);
1928  }
1929
1930  @Override
1931  public Future<Void> modifyTableAsync(final TableName tableName, final TableDescriptor td)
1932      throws IOException {
1933    if (!tableName.equals(td.getTableName())) {
1934      throw new IllegalArgumentException("the specified table name '" + tableName +
1935        "' doesn't match with the HTD one: " + td.getTableName());
1936    }
1937    return modifyTableAsync(td);
1938  }
1939
1940  private static class ModifyTableFuture extends TableFuture<Void> {
1941    public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName,
1942        final ModifyTableResponse response) {
1943      super(admin, tableName,
1944          (response != null && response.hasProcId()) ? response.getProcId() : null);
1945    }
1946
1947    public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
1948      super(admin, tableName, procId);
1949    }
1950
1951    @Override
1952    public String getOperationType() {
1953      return "MODIFY";
1954    }
1955
1956    @Override
1957    protected Void postOperationResult(final Void result, final long deadlineTs)
1958        throws IOException, TimeoutException {
1959      // The modify operation on the table is asynchronous on the server side irrespective
1960      // of whether Procedure V2 is supported or not. So, we wait in the client till
1961      // all regions get updated.
1962      waitForSchemaUpdate(deadlineTs);
1963      return result;
1964    }
1965  }
1966
1967  /**
1968   * @param regionName Name of a region.
1969   * @return a pair of HRegionInfo and ServerName if <code>regionName</code> is
1970   *  a verified region name (we call {@link
1971   *  MetaTableAccessor#getRegionLocation(Connection, byte[])}
1972   *  else null.
1973   * Throw IllegalArgumentException if <code>regionName</code> is null.
1974   * @throws IOException
1975   */
1976  Pair<RegionInfo, ServerName> getRegion(final byte[] regionName) throws IOException {
1977    if (regionName == null) {
1978      throw new IllegalArgumentException("Pass a table name or region name");
1979    }
1980    Pair<RegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionName);
1981    if (pair == null) {
1982      final AtomicReference<Pair<RegionInfo, ServerName>> result = new AtomicReference<>(null);
1983      final String encodedName = Bytes.toString(regionName);
1984      MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
1985        @Override
1986        public boolean visit(Result data) throws IOException {
1987          RegionInfo info = MetaTableAccessor.getRegionInfo(data);
1988          if (info == null) {
1989            LOG.warn("No serialized HRegionInfo in " + data);
1990            return true;
1991          }
1992          RegionLocations rl = MetaTableAccessor.getRegionLocations(data);
1993          boolean matched = false;
1994          ServerName sn = null;
1995          if (rl != null) {
1996            for (HRegionLocation h : rl.getRegionLocations()) {
1997              if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) {
1998                sn = h.getServerName();
1999                info = h.getRegionInfo();
2000                matched = true;
2001              }
2002            }
2003          }
2004          if (!matched) return true;
2005          result.set(new Pair<>(info, sn));
2006          return false; // found the region, stop
2007        }
2008      };
2009
2010      MetaTableAccessor.fullScanRegions(connection, visitor);
2011      pair = result.get();
2012    }
2013    return pair;
2014  }
2015
2016  /**
2017   * If the input is a region name, it is returned as is. If it's an
2018   * encoded region name, the corresponding region is found from meta
2019   * and its region name is returned. If we can't find any region in
2020   * meta matching the input as either region name or encoded region
2021   * name, the input is returned as is. We don't throw unknown
2022   * region exception.
2023   */
2024  private byte[] getRegionName(
2025      final byte[] regionNameOrEncodedRegionName) throws IOException {
2026    if (Bytes.equals(regionNameOrEncodedRegionName,
2027        HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
2028          || Bytes.equals(regionNameOrEncodedRegionName,
2029            HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2030      return HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
2031    }
2032    byte[] tmp = regionNameOrEncodedRegionName;
2033    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionNameOrEncodedRegionName);
2034    if (regionServerPair != null && regionServerPair.getFirst() != null) {
2035      tmp = regionServerPair.getFirst().getRegionName();
2036    }
2037    return tmp;
2038  }
2039
2040  /**
2041   * Check if table exists or not
2042   * @param tableName Name of a table.
2043   * @return tableName instance
2044   * @throws IOException if a remote or network exception occurs.
2045   * @throws TableNotFoundException if table does not exist.
2046   */
2047  private TableName checkTableExists(final TableName tableName)
2048      throws IOException {
2049    return executeCallable(new RpcRetryingCallable<TableName>() {
2050      @Override
2051      protected TableName rpcCall(int callTimeout) throws Exception {
2052        if (!MetaTableAccessor.tableExists(connection, tableName)) {
2053          throw new TableNotFoundException(tableName);
2054        }
2055        return tableName;
2056      }
2057    });
2058  }
2059
2060  @Override
2061  public synchronized void shutdown() throws IOException {
2062    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2063      @Override
2064      protected Void rpcCall() throws Exception {
2065        setPriority(HConstants.HIGH_QOS);
2066        master.shutdown(getRpcController(), ShutdownRequest.newBuilder().build());
2067        return null;
2068      }
2069    });
2070  }
2071
2072  @Override
2073  public synchronized void stopMaster() throws IOException {
2074    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2075      @Override
2076      protected Void rpcCall() throws Exception {
2077        setPriority(HConstants.HIGH_QOS);
2078        master.stopMaster(getRpcController(), StopMasterRequest.newBuilder().build());
2079        return null;
2080      }
2081    });
2082  }
2083
2084  @Override
2085  public synchronized void stopRegionServer(final String hostnamePort)
2086  throws IOException {
2087    String hostname = Addressing.parseHostname(hostnamePort);
2088    int port = Addressing.parsePort(hostnamePort);
2089    final AdminService.BlockingInterface admin =
2090      this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
2091    // TODO: There is no timeout on this controller. Set one!
2092    HBaseRpcController controller = rpcControllerFactory.newController();
2093    controller.setPriority(HConstants.HIGH_QOS);
2094    StopServerRequest request = RequestConverter.buildStopServerRequest(
2095        "Called by admin client " + this.connection.toString());
2096    try {
2097      admin.stopServer(controller, request);
2098    } catch (Exception e) {
2099      throw ProtobufUtil.handleRemoteException(e);
2100    }
2101  }
2102
2103  @Override
2104  public boolean isMasterInMaintenanceMode() throws IOException {
2105    return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(),
2106        this.rpcControllerFactory) {
2107      @Override
2108      protected IsInMaintenanceModeResponse rpcCall() throws Exception {
2109        return master.isMasterInMaintenanceMode(getRpcController(),
2110            IsInMaintenanceModeRequest.newBuilder().build());
2111      }
2112    }).getInMaintenanceMode();
2113  }
2114
2115  @Override
2116  public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException {
2117    return executeCallable(new MasterCallable<ClusterMetrics>(getConnection(),
2118        this.rpcControllerFactory) {
2119      @Override
2120      protected ClusterMetrics rpcCall() throws Exception {
2121        GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(options);
2122        return ClusterMetricsBuilder.toClusterMetrics(
2123          master.getClusterStatus(getRpcController(), req).getClusterStatus());
2124      }
2125    });
2126  }
2127
2128  @Override
2129  public List<RegionMetrics> getRegionMetrics(ServerName serverName, TableName tableName)
2130      throws IOException {
2131    AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
2132    HBaseRpcController controller = rpcControllerFactory.newController();
2133    AdminProtos.GetRegionLoadRequest request =
2134      RequestConverter.buildGetRegionLoadRequest(tableName);
2135    try {
2136      return admin.getRegionLoad(controller, request).getRegionLoadsList().stream()
2137        .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList());
2138    } catch (ServiceException se) {
2139      throw ProtobufUtil.getRemoteException(se);
2140    }
2141  }
2142
2143  @Override
2144  public Configuration getConfiguration() {
2145    return this.conf;
2146  }
2147
2148  /**
2149   * Do a get with a timeout against the passed in <code>future</code>.
2150   */
2151  private static <T> T get(final Future<T> future, final long timeout, final TimeUnit units)
2152  throws IOException {
2153    try {
2154      // TODO: how long should we wait? Spin forever?
2155      return future.get(timeout, units);
2156    } catch (InterruptedException e) {
2157      throw new InterruptedIOException("Interrupt while waiting on " + future);
2158    } catch (TimeoutException e) {
2159      throw new TimeoutIOException(e);
2160    } catch (ExecutionException e) {
2161      if (e.getCause() instanceof IOException) {
2162        throw (IOException)e.getCause();
2163      } else {
2164        throw new IOException(e.getCause());
2165      }
2166    }
2167  }
2168
2169  @Override
2170  public void createNamespace(final NamespaceDescriptor descriptor)
2171  throws IOException {
2172    get(createNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
2173  }
2174
2175  @Override
2176  public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
2177      throws IOException {
2178    CreateNamespaceResponse response =
2179        executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(),
2180            getRpcControllerFactory()) {
2181      @Override
2182      protected CreateNamespaceResponse rpcCall() throws Exception {
2183        return master.createNamespace(getRpcController(),
2184          CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil.
2185              toProtoNamespaceDescriptor(descriptor)).build());
2186      }
2187    });
2188    return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
2189      @Override
2190      public String getOperationType() {
2191        return "CREATE_NAMESPACE";
2192      }
2193    };
2194  }
2195
2196  @Override
2197  public void modifyNamespace(final NamespaceDescriptor descriptor)
2198  throws IOException {
2199    get(modifyNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
2200  }
2201
2202  @Override
2203  public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
2204      throws IOException {
2205    ModifyNamespaceResponse response =
2206        executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(),
2207            getRpcControllerFactory()) {
2208      @Override
2209      protected ModifyNamespaceResponse rpcCall() throws Exception {
2210        // TODO: set priority based on NS?
2211        return master.modifyNamespace(getRpcController(), ModifyNamespaceRequest.newBuilder().
2212          setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
2213       }
2214    });
2215    return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
2216      @Override
2217      public String getOperationType() {
2218        return "MODIFY_NAMESPACE";
2219      }
2220    };
2221  }
2222
2223  @Override
2224  public void deleteNamespace(final String name)
2225  throws IOException {
2226    get(deleteNamespaceAsync(name), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
2227  }
2228
2229  @Override
2230  public Future<Void> deleteNamespaceAsync(final String name)
2231      throws IOException {
2232    DeleteNamespaceResponse response =
2233        executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(),
2234            getRpcControllerFactory()) {
2235      @Override
2236      protected DeleteNamespaceResponse rpcCall() throws Exception {
2237        // TODO: set priority based on NS?
2238        return master.deleteNamespace(getRpcController(), DeleteNamespaceRequest.newBuilder().
2239          setNamespaceName(name).build());
2240        }
2241      });
2242    return new NamespaceFuture(this, name, response.getProcId()) {
2243      @Override
2244      public String getOperationType() {
2245        return "DELETE_NAMESPACE";
2246      }
2247    };
2248  }
2249
2250  @Override
2251  public NamespaceDescriptor getNamespaceDescriptor(final String name)
2252      throws NamespaceNotFoundException, IOException {
2253    return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(),
2254        getRpcControllerFactory()) {
2255      @Override
2256      protected NamespaceDescriptor rpcCall() throws Exception {
2257        return ProtobufUtil.toNamespaceDescriptor(
2258            master.getNamespaceDescriptor(getRpcController(),
2259                GetNamespaceDescriptorRequest.newBuilder().
2260                  setNamespaceName(name).build()).getNamespaceDescriptor());
2261      }
2262    });
2263  }
2264
2265  @Override
2266  public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
2267    return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(),
2268        getRpcControllerFactory()) {
2269      @Override
2270      protected NamespaceDescriptor[] rpcCall() throws Exception {
2271        List<HBaseProtos.NamespaceDescriptor> list =
2272            master.listNamespaceDescriptors(getRpcController(),
2273              ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList();
2274        NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
2275        for(int i = 0; i < list.size(); i++) {
2276          res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
2277        }
2278        return res;
2279      }
2280    });
2281  }
2282
2283  @Override
2284  public String getProcedures() throws IOException {
2285    return executeCallable(new MasterCallable<String>(getConnection(),
2286        getRpcControllerFactory()) {
2287      @Override
2288      protected String rpcCall() throws Exception {
2289        GetProceduresRequest request = GetProceduresRequest.newBuilder().build();
2290        GetProceduresResponse response = master.getProcedures(getRpcController(), request);
2291        return ProtobufUtil.toProcedureJson(response.getProcedureList());
2292      }
2293    });
2294  }
2295
2296  @Override
2297  public String getLocks() throws IOException {
2298    return executeCallable(new MasterCallable<String>(getConnection(),
2299        getRpcControllerFactory()) {
2300      @Override
2301      protected String rpcCall() throws Exception {
2302        GetLocksRequest request = GetLocksRequest.newBuilder().build();
2303        GetLocksResponse response = master.getLocks(getRpcController(), request);
2304        return ProtobufUtil.toLockJson(response.getLockList());
2305      }
2306    });
2307  }
2308
2309  @Override
2310  public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
2311    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
2312        getRpcControllerFactory()) {
2313      @Override
2314      protected HTableDescriptor[] rpcCall() throws Exception {
2315        List<TableSchema> list =
2316            master.listTableDescriptorsByNamespace(getRpcController(),
2317                ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
2318                .build()).getTableSchemaList();
2319        HTableDescriptor[] res = new HTableDescriptor[list.size()];
2320        for(int i=0; i < list.size(); i++) {
2321          res[i] = new ImmutableHTableDescriptor(ProtobufUtil.toTableDescriptor(list.get(i)));
2322        }
2323        return res;
2324      }
2325    });
2326  }
2327
2328  @Override
2329  public TableName[] listTableNamesByNamespace(final String name) throws IOException {
2330    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
2331        getRpcControllerFactory()) {
2332      @Override
2333      protected TableName[] rpcCall() throws Exception {
2334        List<HBaseProtos.TableName> tableNames =
2335            master.listTableNamesByNamespace(getRpcController(), ListTableNamesByNamespaceRequest.
2336                newBuilder().setNamespaceName(name).build())
2337            .getTableNameList();
2338        TableName[] result = new TableName[tableNames.size()];
2339        for (int i = 0; i < tableNames.size(); i++) {
2340          result[i] = ProtobufUtil.toTableName(tableNames.get(i));
2341        }
2342        return result;
2343      }
2344    });
2345  }
2346
2347  /**
2348   * Is HBase available? Throw an exception if not.
2349   * @param conf system configuration
2350   * @throws MasterNotRunningException if the master is not running.
2351   * @throws ZooKeeperConnectionException if unable to connect to zookeeper. // TODO do not expose
2352   *           ZKConnectionException.
2353   */
2354  public static void available(final Configuration conf)
2355      throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
2356    Configuration copyOfConf = HBaseConfiguration.create(conf);
2357    // We set it to make it fail as soon as possible if HBase is not available
2358    copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
2359    copyOfConf.setInt("zookeeper.recovery.retry", 0);
2360
2361    // Check ZK first.
2362    // If the connection exists, we may have a connection to ZK that does not work anymore
2363    try (ClusterConnection connection =
2364        (ClusterConnection) ConnectionFactory.createConnection(copyOfConf)) {
2365      // can throw MasterNotRunningException
2366      connection.isMasterRunning();
2367    }
2368  }
2369
2370  /**
2371   *
2372   * @param tableName
2373   * @return List of {@link HRegionInfo}.
2374   * @throws IOException
2375   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
2376   *             Use {@link #getRegions(TableName)}.
2377   */
2378  @Deprecated
2379  @Override
2380  public List<HRegionInfo> getTableRegions(final TableName tableName)
2381    throws IOException {
2382    return getRegions(tableName).stream()
2383        .map(ImmutableHRegionInfo::new)
2384        .collect(Collectors.toList());
2385  }
2386
2387  @Override
2388  public synchronized void close() throws IOException {
2389  }
2390
2391  @Override
2392  public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames)
2393  throws IOException {
2394    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
2395        getRpcControllerFactory()) {
2396      @Override
2397      protected HTableDescriptor[] rpcCall() throws Exception {
2398        GetTableDescriptorsRequest req =
2399            RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2400        return ProtobufUtil
2401            .toTableDescriptorList(master.getTableDescriptors(getRpcController(), req)).stream()
2402            .map(ImmutableHTableDescriptor::new).toArray(HTableDescriptor[]::new);
2403      }
2404    });
2405  }
2406
2407  @Override
2408  public HTableDescriptor[] getTableDescriptors(List<String> names)
2409  throws IOException {
2410    List<TableName> tableNames = new ArrayList<>(names.size());
2411    for(String name : names) {
2412      tableNames.add(TableName.valueOf(name));
2413    }
2414    return getTableDescriptorsByTableName(tableNames);
2415  }
2416
2417  private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
2418      FailedLogCloseException {
2419    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2420    RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
2421    // TODO: There is no timeout on this controller. Set one!
2422    HBaseRpcController controller = rpcControllerFactory.newController();
2423    try {
2424      return admin.rollWALWriter(controller, request);
2425    } catch (ServiceException e) {
2426      throw ProtobufUtil.handleRemoteException(e);
2427    }
2428  }
2429
2430  /**
2431   * Roll the log writer. I.e. when using a file system based write ahead log,
2432   * start writing log messages to a new file.
2433   *
2434   * Note that when talking to a version 1.0+ HBase deployment, the rolling is asynchronous.
2435   * This method will return as soon as the roll is requested and the return value will
2436   * always be null. Additionally, the named region server may schedule store flushes at the
2437   * request of the wal handling the roll request.
2438   *
2439   * When talking to a 0.98 or older HBase deployment, the rolling is synchronous and the
2440   * return value may be either null or a list of encoded region names.
2441   *
2442   * @param serverName
2443   *          The servername of the regionserver. A server name is made of host,
2444   *          port and startcode. This is mandatory. Here is an example:
2445   *          <code> host187.example.com,60020,1289493121758</code>
2446   * @return a set of {@link HRegionInfo#getEncodedName()} that would allow the wal to
2447   *         clean up some underlying files. null if there's nothing to flush.
2448   * @throws IOException if a remote or network exception occurs
2449   * @throws FailedLogCloseException
2450   * @deprecated use {@link #rollWALWriter(ServerName)}
2451   */
2452  @Deprecated
2453  public synchronized byte[][] rollHLogWriter(String serverName)
2454      throws IOException, FailedLogCloseException {
2455    ServerName sn = ServerName.valueOf(serverName);
2456    final RollWALWriterResponse response = rollWALWriterImpl(sn);
2457    int regionCount = response.getRegionToFlushCount();
2458    if (0 == regionCount) {
2459      return null;
2460    }
2461    byte[][] regionsToFlush = new byte[regionCount][];
2462    for (int i = 0; i < regionCount; i++) {
2463      regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i));
2464    }
2465    return regionsToFlush;
2466  }
2467
2468  @Override
2469  public synchronized void rollWALWriter(ServerName serverName)
2470      throws IOException, FailedLogCloseException {
2471    rollWALWriterImpl(serverName);
2472  }
2473
2474  @Override
2475  public CompactionState getCompactionState(final TableName tableName)
2476  throws IOException {
2477    return getCompactionState(tableName, CompactType.NORMAL);
2478  }
2479
2480  @Override
2481  public CompactionState getCompactionStateForRegion(final byte[] regionName)
2482  throws IOException {
2483    final Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
2484    if (regionServerPair == null) {
2485      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
2486    }
2487    if (regionServerPair.getSecond() == null) {
2488      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
2489    }
2490    ServerName sn = regionServerPair.getSecond();
2491    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2492    // TODO: There is no timeout on this controller. Set one!
2493    HBaseRpcController controller = rpcControllerFactory.newController();
2494    GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
2495      regionServerPair.getFirst().getRegionName(), true);
2496    GetRegionInfoResponse response;
2497    try {
2498      response = admin.getRegionInfo(controller, request);
2499    } catch (ServiceException e) {
2500      throw ProtobufUtil.handleRemoteException(e);
2501    }
2502    if (response.getCompactionState() != null) {
2503      return ProtobufUtil.createCompactionState(response.getCompactionState());
2504    }
2505    return null;
2506  }
2507
2508  @Override
2509  public void snapshot(final String snapshotName,
2510                       final TableName tableName) throws IOException,
2511      SnapshotCreationException, IllegalArgumentException {
2512    snapshot(snapshotName, tableName, SnapshotType.FLUSH);
2513  }
2514
2515  @Override
2516  public void snapshot(final byte[] snapshotName, final TableName tableName)
2517      throws IOException, SnapshotCreationException, IllegalArgumentException {
2518    snapshot(Bytes.toString(snapshotName), tableName, SnapshotType.FLUSH);
2519  }
2520
2521  @Override
2522  public void snapshot(final String snapshotName, final TableName tableName,
2523      SnapshotType type)
2524      throws IOException, SnapshotCreationException, IllegalArgumentException {
2525    snapshot(new SnapshotDescription(snapshotName, tableName, type));
2526  }
2527
2528  @Override
2529  public void snapshot(SnapshotDescription snapshotDesc)
2530      throws IOException, SnapshotCreationException, IllegalArgumentException {
2531    // actually take the snapshot
2532    SnapshotProtos.SnapshotDescription snapshot =
2533      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2534    SnapshotResponse response = asyncSnapshot(snapshot);
2535    final IsSnapshotDoneRequest request =
2536        IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build();
2537    IsSnapshotDoneResponse done = null;
2538    long start = EnvironmentEdgeManager.currentTime();
2539    long max = response.getExpectedTimeout();
2540    long maxPauseTime = max / this.numRetries;
2541    int tries = 0;
2542    LOG.debug("Waiting a max of " + max + " ms for snapshot '" +
2543        ClientSnapshotDescriptionUtils.toString(snapshot) + "'' to complete. (max " +
2544        maxPauseTime + " ms per retry)");
2545    while (tries == 0
2546        || ((EnvironmentEdgeManager.currentTime() - start) < max && !done.getDone())) {
2547      try {
2548        // sleep a backoff <= pauseTime amount
2549        long sleep = getPauseTime(tries++);
2550        sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2551        LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2552          "ms while waiting for snapshot completion.");
2553        Thread.sleep(sleep);
2554      } catch (InterruptedException e) {
2555        throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2556      }
2557      LOG.debug("Getting current status of snapshot from master...");
2558      done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
2559          getRpcControllerFactory()) {
2560        @Override
2561        protected IsSnapshotDoneResponse rpcCall() throws Exception {
2562          return master.isSnapshotDone(getRpcController(), request);
2563        }
2564      });
2565    }
2566    if (!done.getDone()) {
2567      throw new SnapshotCreationException("Snapshot '" + snapshot.getName()
2568          + "' wasn't completed in expectedTime:" + max + " ms", snapshotDesc);
2569    }
2570  }
2571
2572  @Override
2573  public void snapshotAsync(SnapshotDescription snapshotDesc) throws IOException,
2574      SnapshotCreationException {
2575    asyncSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc));
2576  }
2577
2578  private SnapshotResponse asyncSnapshot(SnapshotProtos.SnapshotDescription snapshot)
2579      throws IOException {
2580    ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2581    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2582        .build();
2583    // run the snapshot on the master
2584    return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(),
2585        getRpcControllerFactory()) {
2586      @Override
2587      protected SnapshotResponse rpcCall() throws Exception {
2588        return master.snapshot(getRpcController(), request);
2589      }
2590    });
2591  }
2592
2593  @Override
2594  public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
2595      throws IOException, HBaseSnapshotException, UnknownSnapshotException {
2596    final SnapshotProtos.SnapshotDescription snapshot =
2597        ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2598    return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
2599        getRpcControllerFactory()) {
2600      @Override
2601      protected IsSnapshotDoneResponse rpcCall() throws Exception {
2602        return master.isSnapshotDone(getRpcController(),
2603          IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
2604      }
2605    }).getDone();
2606  }
2607
2608  @Override
2609  public void restoreSnapshot(final byte[] snapshotName)
2610      throws IOException, RestoreSnapshotException {
2611    restoreSnapshot(Bytes.toString(snapshotName));
2612  }
2613
2614  @Override
2615  public void restoreSnapshot(final String snapshotName)
2616      throws IOException, RestoreSnapshotException {
2617    boolean takeFailSafeSnapshot =
2618        conf.getBoolean(HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2619          HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2620    restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2621  }
2622
2623  @Override
2624  public void restoreSnapshot(final byte[] snapshotName, final boolean takeFailSafeSnapshot)
2625      throws IOException, RestoreSnapshotException {
2626    restoreSnapshot(Bytes.toString(snapshotName), takeFailSafeSnapshot);
2627  }
2628
2629  /*
2630   * Check whether the snapshot exists and contains disabled table
2631   *
2632   * @param snapshotName name of the snapshot to restore
2633   * @throws IOException if a remote or network exception occurs
2634   * @throws RestoreSnapshotException if no valid snapshot is found
2635   */
2636  private TableName getTableNameBeforeRestoreSnapshot(final String snapshotName)
2637      throws IOException, RestoreSnapshotException {
2638    TableName tableName = null;
2639    for (SnapshotDescription snapshotInfo: listSnapshots()) {
2640      if (snapshotInfo.getName().equals(snapshotName)) {
2641        tableName = snapshotInfo.getTableName();
2642        break;
2643      }
2644    }
2645
2646    if (tableName == null) {
2647      throw new RestoreSnapshotException(
2648        "Unable to find the table name for snapshot=" + snapshotName);
2649    }
2650    return tableName;
2651  }
2652
2653  @Override
2654  public void restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot)
2655      throws IOException, RestoreSnapshotException {
2656    restoreSnapshot(snapshotName, takeFailSafeSnapshot, false);
2657  }
2658
2659  @Override
2660  public void restoreSnapshot(final String snapshotName, final boolean takeFailSafeSnapshot,
2661      final boolean restoreAcl) throws IOException, RestoreSnapshotException {
2662    TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2663
2664    // The table does not exists, switch to clone.
2665    if (!tableExists(tableName)) {
2666      cloneSnapshot(snapshotName, tableName, restoreAcl);
2667      return;
2668    }
2669
2670    // Check if the table is disabled
2671    if (!isTableDisabled(tableName)) {
2672      throw new TableNotDisabledException(tableName);
2673    }
2674
2675    // Take a snapshot of the current state
2676    String failSafeSnapshotSnapshotName = null;
2677    if (takeFailSafeSnapshot) {
2678      failSafeSnapshotSnapshotName = conf.get("hbase.snapshot.restore.failsafe.name",
2679        "hbase-failsafe-{snapshot.name}-{restore.timestamp}");
2680      failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotName
2681        .replace("{snapshot.name}", snapshotName)
2682        .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2683        .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2684      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2685      snapshot(failSafeSnapshotSnapshotName, tableName);
2686    }
2687
2688    try {
2689      // Restore snapshot
2690      get(
2691        internalRestoreSnapshotAsync(snapshotName, tableName, restoreAcl),
2692        syncWaitTimeout,
2693        TimeUnit.MILLISECONDS);
2694    } catch (IOException e) {
2695      // Something went wrong during the restore...
2696      // if the pre-restore snapshot is available try to rollback
2697      if (takeFailSafeSnapshot) {
2698        try {
2699          get(
2700            internalRestoreSnapshotAsync(failSafeSnapshotSnapshotName, tableName, restoreAcl),
2701            syncWaitTimeout,
2702            TimeUnit.MILLISECONDS);
2703          String msg = "Restore snapshot=" + snapshotName +
2704            " failed. Rollback to snapshot=" + failSafeSnapshotSnapshotName + " succeeded.";
2705          LOG.error(msg, e);
2706          throw new RestoreSnapshotException(msg, e);
2707        } catch (IOException ex) {
2708          String msg = "Failed to restore and rollback to snapshot=" + failSafeSnapshotSnapshotName;
2709          LOG.error(msg, ex);
2710          throw new RestoreSnapshotException(msg, e);
2711        }
2712      } else {
2713        throw new RestoreSnapshotException("Failed to restore snapshot=" + snapshotName, e);
2714      }
2715    }
2716
2717    // If the restore is succeeded, delete the pre-restore snapshot
2718    if (takeFailSafeSnapshot) {
2719      try {
2720        LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2721        deleteSnapshot(failSafeSnapshotSnapshotName);
2722      } catch (IOException e) {
2723        LOG.error("Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, e);
2724      }
2725    }
2726  }
2727
2728  @Override
2729  public Future<Void> restoreSnapshotAsync(final String snapshotName)
2730      throws IOException, RestoreSnapshotException {
2731    TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2732
2733    // The table does not exists, switch to clone.
2734    if (!tableExists(tableName)) {
2735      return cloneSnapshotAsync(snapshotName, tableName);
2736    }
2737
2738    // Check if the table is disabled
2739    if (!isTableDisabled(tableName)) {
2740      throw new TableNotDisabledException(tableName);
2741    }
2742
2743    return internalRestoreSnapshotAsync(snapshotName, tableName, false);
2744  }
2745
2746  @Override
2747  public void cloneSnapshot(final byte[] snapshotName, final TableName tableName)
2748      throws IOException, TableExistsException, RestoreSnapshotException {
2749    cloneSnapshot(Bytes.toString(snapshotName), tableName);
2750  }
2751
2752  @Override
2753  public void cloneSnapshot(String snapshotName, TableName tableName, boolean restoreAcl)
2754      throws IOException, TableExistsException, RestoreSnapshotException {
2755    if (tableExists(tableName)) {
2756      throw new TableExistsException(tableName);
2757    }
2758    get(
2759      internalRestoreSnapshotAsync(snapshotName, tableName, restoreAcl),
2760      Integer.MAX_VALUE,
2761      TimeUnit.MILLISECONDS);
2762  }
2763
2764  @Override
2765  public void cloneSnapshot(final String snapshotName, final TableName tableName)
2766      throws IOException, TableExistsException, RestoreSnapshotException {
2767    cloneSnapshot(snapshotName, tableName, false);
2768  }
2769
2770  @Override
2771  public Future<Void> cloneSnapshotAsync(final String snapshotName, final TableName tableName)
2772      throws IOException, TableExistsException {
2773    if (tableExists(tableName)) {
2774      throw new TableExistsException(tableName);
2775    }
2776    return internalRestoreSnapshotAsync(snapshotName, tableName, false);
2777  }
2778
2779  @Override
2780  public byte[] execProcedureWithReturn(String signature, String instance, Map<String,
2781      String> props) throws IOException {
2782    ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
2783    final ExecProcedureRequest request =
2784        ExecProcedureRequest.newBuilder().setProcedure(desc).build();
2785    // run the procedure on the master
2786    ExecProcedureResponse response = executeCallable(
2787      new MasterCallable<ExecProcedureResponse>(getConnection(), getRpcControllerFactory()) {
2788        @Override
2789        protected ExecProcedureResponse rpcCall() throws Exception {
2790          return master.execProcedureWithRet(getRpcController(), request);
2791        }
2792      });
2793
2794    return response.hasReturnData() ? response.getReturnData().toByteArray() : null;
2795  }
2796
2797  @Override
2798  public void execProcedure(String signature, String instance, Map<String, String> props)
2799      throws IOException {
2800    ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
2801    final ExecProcedureRequest request =
2802        ExecProcedureRequest.newBuilder().setProcedure(desc).build();
2803    // run the procedure on the master
2804    ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
2805        getConnection(), getRpcControllerFactory()) {
2806      @Override
2807      protected ExecProcedureResponse rpcCall() throws Exception {
2808        return master.execProcedure(getRpcController(), request);
2809      }
2810    });
2811
2812    long start = EnvironmentEdgeManager.currentTime();
2813    long max = response.getExpectedTimeout();
2814    long maxPauseTime = max / this.numRetries;
2815    int tries = 0;
2816    LOG.debug("Waiting a max of " + max + " ms for procedure '" +
2817        signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
2818    boolean done = false;
2819    while (tries == 0
2820        || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
2821      try {
2822        // sleep a backoff <= pauseTime amount
2823        long sleep = getPauseTime(tries++);
2824        sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2825        LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2826          "ms while waiting for procedure completion.");
2827        Thread.sleep(sleep);
2828      } catch (InterruptedException e) {
2829        throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2830      }
2831      LOG.debug("Getting current status of procedure from master...");
2832      done = isProcedureFinished(signature, instance, props);
2833    }
2834    if (!done) {
2835      throw new IOException("Procedure '" + signature + " : " + instance
2836          + "' wasn't completed in expectedTime:" + max + " ms");
2837    }
2838  }
2839
2840  @Override
2841  public boolean isProcedureFinished(String signature, String instance, Map<String, String> props)
2842      throws IOException {
2843    ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
2844    return executeCallable(
2845      new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
2846        @Override
2847        protected IsProcedureDoneResponse rpcCall() throws Exception {
2848          return master.isProcedureDone(getRpcController(),
2849            IsProcedureDoneRequest.newBuilder().setProcedure(desc).build());
2850        }
2851      }).getDone();
2852  }
2853
2854  /**
2855   * Execute Restore/Clone snapshot and wait for the server to complete (blocking).
2856   * To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to
2857   * create an HTable instance to this table before it is available.
2858   * @param snapshotName snapshot to restore
2859   * @param tableName table name to restore the snapshot on
2860   * @throws IOException if a remote or network exception occurs
2861   * @throws RestoreSnapshotException if snapshot failed to be restored
2862   * @throws IllegalArgumentException if the restore request is formatted incorrectly
2863   */
2864  private Future<Void> internalRestoreSnapshotAsync(final String snapshotName,
2865      final TableName tableName, final boolean restoreAcl)
2866      throws IOException, RestoreSnapshotException {
2867    final SnapshotProtos.SnapshotDescription snapshot =
2868        SnapshotProtos.SnapshotDescription.newBuilder()
2869        .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2870
2871    // actually restore the snapshot
2872    ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2873
2874    RestoreSnapshotResponse response = executeCallable(
2875        new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) {
2876          Long nonceGroup = ng.getNonceGroup();
2877          Long nonce = ng.newNonce();
2878      @Override
2879      protected RestoreSnapshotResponse rpcCall() throws Exception {
2880        final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
2881            .setSnapshot(snapshot)
2882            .setNonceGroup(nonceGroup)
2883            .setNonce(nonce)
2884            .setRestoreACL(restoreAcl)
2885            .build();
2886        return master.restoreSnapshot(getRpcController(), request);
2887      }
2888    });
2889
2890    return new RestoreSnapshotFuture(this, snapshot, tableName, response);
2891  }
2892
2893  private static class RestoreSnapshotFuture extends TableFuture<Void> {
2894    public RestoreSnapshotFuture(
2895        final HBaseAdmin admin,
2896        final SnapshotProtos.SnapshotDescription snapshot,
2897        final TableName tableName,
2898        final RestoreSnapshotResponse response) {
2899      super(admin, tableName,
2900          (response != null && response.hasProcId()) ? response.getProcId() : null);
2901
2902      if (response != null && !response.hasProcId()) {
2903        throw new UnsupportedOperationException("Client could not call old version of Server");
2904      }
2905    }
2906
2907    public RestoreSnapshotFuture(
2908        final HBaseAdmin admin,
2909        final TableName tableName,
2910        final Long procId) {
2911      super(admin, tableName, procId);
2912    }
2913
2914    @Override
2915    public String getOperationType() {
2916      return "MODIFY";
2917    }
2918  }
2919
2920  @Override
2921  public List<SnapshotDescription> listSnapshots() throws IOException {
2922    return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(),
2923        getRpcControllerFactory()) {
2924      @Override
2925      protected List<SnapshotDescription> rpcCall() throws Exception {
2926        List<SnapshotProtos.SnapshotDescription> snapshotsList = master
2927            .getCompletedSnapshots(getRpcController(),
2928                GetCompletedSnapshotsRequest.newBuilder().build())
2929            .getSnapshotsList();
2930        List<SnapshotDescription> result = new ArrayList<>(snapshotsList.size());
2931        for (SnapshotProtos.SnapshotDescription snapshot : snapshotsList) {
2932          result.add(ProtobufUtil.createSnapshotDesc(snapshot));
2933        }
2934        return result;
2935      }
2936    });
2937  }
2938
2939  @Override
2940  public List<SnapshotDescription> listSnapshots(String regex) throws IOException {
2941    return listSnapshots(Pattern.compile(regex));
2942  }
2943
2944  @Override
2945  public List<SnapshotDescription> listSnapshots(Pattern pattern) throws IOException {
2946    List<SnapshotDescription> matched = new LinkedList<>();
2947    List<SnapshotDescription> snapshots = listSnapshots();
2948    for (SnapshotDescription snapshot : snapshots) {
2949      if (pattern.matcher(snapshot.getName()).matches()) {
2950        matched.add(snapshot);
2951      }
2952    }
2953    return matched;
2954  }
2955
2956  @Override
2957  public List<SnapshotDescription> listTableSnapshots(String tableNameRegex,
2958      String snapshotNameRegex) throws IOException {
2959    return listTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
2960  }
2961
2962  @Override
2963  public List<SnapshotDescription> listTableSnapshots(Pattern tableNamePattern,
2964      Pattern snapshotNamePattern) throws IOException {
2965    TableName[] tableNames = listTableNames(tableNamePattern);
2966
2967    List<SnapshotDescription> tableSnapshots = new LinkedList<>();
2968    List<SnapshotDescription> snapshots = listSnapshots(snapshotNamePattern);
2969
2970    List<TableName> listOfTableNames = Arrays.asList(tableNames);
2971    for (SnapshotDescription snapshot : snapshots) {
2972      if (listOfTableNames.contains(snapshot.getTableName())) {
2973        tableSnapshots.add(snapshot);
2974      }
2975    }
2976    return tableSnapshots;
2977  }
2978
2979  @Override
2980  public void deleteSnapshot(final byte[] snapshotName) throws IOException {
2981    deleteSnapshot(Bytes.toString(snapshotName));
2982  }
2983
2984  @Override
2985  public void deleteSnapshot(final String snapshotName) throws IOException {
2986    // make sure the snapshot is possibly valid
2987    TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
2988    // do the delete
2989    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2990      @Override
2991      protected Void rpcCall() throws Exception {
2992        master.deleteSnapshot(getRpcController(),
2993          DeleteSnapshotRequest.newBuilder().setSnapshot(
2994                SnapshotProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
2995              .build()
2996        );
2997        return null;
2998      }
2999    });
3000  }
3001
3002  @Override
3003  public void deleteSnapshots(final String regex) throws IOException {
3004    deleteSnapshots(Pattern.compile(regex));
3005  }
3006
3007  @Override
3008  public void deleteSnapshots(final Pattern pattern) throws IOException {
3009    List<SnapshotDescription> snapshots = listSnapshots(pattern);
3010    for (final SnapshotDescription snapshot : snapshots) {
3011      try {
3012        internalDeleteSnapshot(snapshot);
3013      } catch (IOException ex) {
3014        LOG.info("Failed to delete snapshot " + snapshot.getName() + " for table "
3015                + snapshot.getTableNameAsString(), ex);
3016      }
3017    }
3018  }
3019
3020  private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
3021    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3022      @Override
3023      protected Void rpcCall() throws Exception {
3024        this.master.deleteSnapshot(getRpcController(), DeleteSnapshotRequest.newBuilder()
3025          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build());
3026        return null;
3027      }
3028    });
3029  }
3030
3031  @Override
3032  public void deleteTableSnapshots(String tableNameRegex, String snapshotNameRegex)
3033      throws IOException {
3034    deleteTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
3035  }
3036
3037  @Override
3038  public void deleteTableSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern)
3039      throws IOException {
3040    List<SnapshotDescription> snapshots = listTableSnapshots(tableNamePattern, snapshotNamePattern);
3041    for (SnapshotDescription snapshot : snapshots) {
3042      try {
3043        internalDeleteSnapshot(snapshot);
3044        LOG.debug("Successfully deleted snapshot: " + snapshot.getName());
3045      } catch (IOException e) {
3046        LOG.error("Failed to delete snapshot: " + snapshot.getName(), e);
3047      }
3048    }
3049  }
3050
3051  @Override
3052  public void setQuota(final QuotaSettings quota) throws IOException {
3053    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3054      @Override
3055      protected Void rpcCall() throws Exception {
3056        this.master.setQuota(getRpcController(), QuotaSettings.buildSetQuotaRequestProto(quota));
3057        return null;
3058      }
3059    });
3060  }
3061
3062  @Override
3063  public QuotaRetriever getQuotaRetriever(final QuotaFilter filter) throws IOException {
3064    return QuotaRetriever.open(conf, filter);
3065  }
3066
3067  @Override
3068  public List<QuotaSettings> getQuota(QuotaFilter filter) throws IOException {
3069    List<QuotaSettings> quotas = new LinkedList<>();
3070    try (QuotaRetriever retriever = QuotaRetriever.open(conf, filter)) {
3071      Iterator<QuotaSettings> iterator = retriever.iterator();
3072      while (iterator.hasNext()) {
3073        quotas.add(iterator.next());
3074      }
3075    }
3076    return quotas;
3077  }
3078
3079  private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable)
3080      throws IOException {
3081    return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout);
3082  }
3083
3084  static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
3085             RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
3086  throws IOException {
3087    RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
3088    try {
3089      return caller.callWithRetries(callable, operationTimeout);
3090    } finally {
3091      callable.close();
3092    }
3093  }
3094
3095  @Override
3096  // Coprocessor Endpoint against the Master.
3097  public CoprocessorRpcChannel coprocessorService() {
3098    return new SyncCoprocessorRpcChannel() {
3099      @Override
3100      protected Message callExecService(final RpcController controller,
3101          final Descriptors.MethodDescriptor method, final Message request,
3102          final Message responsePrototype)
3103      throws IOException {
3104        if (LOG.isTraceEnabled()) {
3105          LOG.trace("Call: " + method.getName() + ", " + request.toString());
3106        }
3107        // Try-with-resources so close gets called when we are done.
3108        try (MasterCallable<CoprocessorServiceResponse> callable =
3109            new MasterCallable<CoprocessorServiceResponse>(connection,
3110                connection.getRpcControllerFactory()) {
3111          @Override
3112          protected CoprocessorServiceResponse rpcCall() throws Exception {
3113            CoprocessorServiceRequest csr =
3114                CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
3115            return this.master.execMasterService(getRpcController(), csr);
3116          }
3117        }) {
3118          // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller
3119          callable.prepare(false);
3120          int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
3121          CoprocessorServiceResponse result = callable.call(operationTimeout);
3122          return CoprocessorRpcUtils.getResponse(result, responsePrototype);
3123        }
3124      }
3125    };
3126  }
3127
3128  /**
3129   * Simple {@link Abortable}, throwing RuntimeException on abort.
3130   */
3131  private static class ThrowableAbortable implements Abortable {
3132    @Override
3133    public void abort(String why, Throwable e) {
3134      throw new RuntimeException(why, e);
3135    }
3136
3137    @Override
3138    public boolean isAborted() {
3139      return true;
3140    }
3141  }
3142
3143  @Override
3144  public CoprocessorRpcChannel coprocessorService(final ServerName serverName) {
3145    return new SyncCoprocessorRpcChannel() {
3146      @Override
3147      protected Message callExecService(RpcController controller,
3148          Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
3149      throws IOException {
3150        if (LOG.isTraceEnabled()) {
3151          LOG.trace("Call: " + method.getName() + ", " + request.toString());
3152        }
3153        CoprocessorServiceRequest csr =
3154            CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
3155        // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller
3156        // TODO: Make this same as RegionCoprocessorRpcChannel and MasterCoprocessorRpcChannel. They
3157        // are all different though should do same thing; e.g. RpcChannel setup.
3158        ClientProtos.ClientService.BlockingInterface stub = connection.getClient(serverName);
3159        CoprocessorServiceResponse result;
3160        try {
3161          result = stub.
3162              execRegionServerService(connection.getRpcControllerFactory().newController(), csr);
3163          return CoprocessorRpcUtils.getResponse(result, responsePrototype);
3164        } catch (ServiceException e) {
3165          throw ProtobufUtil.handleRemoteException(e);
3166        }
3167      }
3168    };
3169  }
3170
3171  @Override
3172  public void updateConfiguration(final ServerName server) throws IOException {
3173    final AdminService.BlockingInterface admin = this.connection.getAdmin(server);
3174    Callable<Void> callable = new Callable<Void>() {
3175      @Override
3176      public Void call() throws Exception {
3177        admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance());
3178        return null;
3179      }
3180    };
3181    ProtobufUtil.call(callable);
3182  }
3183
3184  @Override
3185  public void updateConfiguration() throws IOException {
3186    ClusterMetrics status = getClusterMetrics(
3187      EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS));
3188    for (ServerName server : status.getLiveServerMetrics().keySet()) {
3189      updateConfiguration(server);
3190    }
3191
3192    updateConfiguration(status.getMasterName());
3193
3194    for (ServerName server : status.getBackupMasterNames()) {
3195      updateConfiguration(server);
3196    }
3197  }
3198
3199  @Override
3200  public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
3201    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
3202      @Override
3203      protected Long rpcCall() throws Exception {
3204        MajorCompactionTimestampRequest req =
3205            MajorCompactionTimestampRequest.newBuilder()
3206                .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3207        return master.getLastMajorCompactionTimestamp(getRpcController(), req).
3208            getCompactionTimestamp();
3209      }
3210    });
3211  }
3212
3213  @Override
3214  public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
3215    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
3216      @Override
3217      protected Long rpcCall() throws Exception {
3218        MajorCompactionTimestampForRegionRequest req =
3219            MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter
3220                      .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
3221        return master.getLastMajorCompactionTimestampForRegion(getRpcController(), req)
3222            .getCompactionTimestamp();
3223      }
3224    });
3225  }
3226
3227  /**
3228   * {@inheritDoc}
3229   */
3230  @Override
3231  public void compact(final TableName tableName, final byte[] columnFamily, CompactType compactType)
3232    throws IOException, InterruptedException {
3233    compact(tableName, columnFamily, false, compactType);
3234  }
3235
3236  /**
3237   * {@inheritDoc}
3238   */
3239  @Override
3240  public void compact(final TableName tableName, CompactType compactType)
3241    throws IOException, InterruptedException {
3242    compact(tableName, null, false, compactType);
3243  }
3244
3245  /**
3246   * {@inheritDoc}
3247   */
3248  @Override
3249  public void majorCompact(final TableName tableName, final byte[] columnFamily,
3250    CompactType compactType) throws IOException, InterruptedException {
3251    compact(tableName, columnFamily, true, compactType);
3252  }
3253
3254  /**
3255   * {@inheritDoc}
3256   */
3257  @Override
3258  public void majorCompact(final TableName tableName, CompactType compactType)
3259          throws IOException, InterruptedException {
3260    compact(tableName, null, true, compactType);
3261  }
3262
3263  /**
3264   * {@inheritDoc}
3265   */
3266  @Override
3267  public CompactionState getCompactionState(final TableName tableName, CompactType compactType)
3268      throws IOException {
3269    AdminProtos.GetRegionInfoResponse.CompactionState state =
3270      AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
3271    checkTableExists(tableName);
3272    // TODO: There is no timeout on this controller. Set one!
3273    HBaseRpcController rpcController = rpcControllerFactory.newController();
3274    switch (compactType) {
3275      case MOB:
3276        final AdminProtos.AdminService.BlockingInterface masterAdmin =
3277          this.connection.getAdminForMaster();
3278        Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
3279          new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
3280            @Override
3281            public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
3282              RegionInfo info = RegionInfo.createMobRegionInfo(tableName);
3283              GetRegionInfoRequest request =
3284                RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true);
3285              GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
3286              return response.getCompactionState();
3287            }
3288          };
3289        state = ProtobufUtil.call(callable);
3290        break;
3291      case NORMAL:
3292        for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) {
3293          ServerName sn = loc.getServerName();
3294          if (sn == null) {
3295            continue;
3296          }
3297          byte[] regionName = loc.getRegion().getRegionName();
3298          AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
3299          try {
3300            Callable<GetRegionInfoResponse> regionInfoCallable =
3301              new Callable<GetRegionInfoResponse>() {
3302                @Override
3303                public GetRegionInfoResponse call() throws Exception {
3304                  GetRegionInfoRequest request =
3305                    RequestConverter.buildGetRegionInfoRequest(regionName, true);
3306                  return snAdmin.getRegionInfo(rpcController, request);
3307                }
3308              };
3309            GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
3310            switch (response.getCompactionState()) {
3311              case MAJOR_AND_MINOR:
3312                return CompactionState.MAJOR_AND_MINOR;
3313              case MAJOR:
3314                if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MINOR) {
3315                  return CompactionState.MAJOR_AND_MINOR;
3316                }
3317                state = AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR;
3318                break;
3319              case MINOR:
3320                if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) {
3321                  return CompactionState.MAJOR_AND_MINOR;
3322                }
3323                state = AdminProtos.GetRegionInfoResponse.CompactionState.MINOR;
3324                break;
3325              case NONE:
3326              default: // nothing, continue
3327            }
3328          } catch (NotServingRegionException e) {
3329            if (LOG.isDebugEnabled()) {
3330              LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " +
3331                StringUtils.stringifyException(e));
3332            }
3333          } catch (RemoteException e) {
3334            if (e.getMessage().indexOf(NotServingRegionException.class.getName()) >= 0) {
3335              if (LOG.isDebugEnabled()) {
3336                LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " +
3337                  StringUtils.stringifyException(e));
3338              }
3339            } else {
3340              throw e;
3341            }
3342          }
3343        }
3344        break;
3345      default:
3346        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3347    }
3348    if (state != null) {
3349      return ProtobufUtil.createCompactionState(state);
3350    }
3351    return null;
3352  }
3353
3354  /**
3355   * Future that waits on a procedure result.
3356   * Returned by the async version of the Admin calls,
3357   * and used internally by the sync calls to wait on the result of the procedure.
3358   */
3359  @InterfaceAudience.Private
3360  @InterfaceStability.Evolving
3361  protected static class ProcedureFuture<V> implements Future<V> {
3362    private ExecutionException exception = null;
3363    private boolean procResultFound = false;
3364    private boolean done = false;
3365    private boolean cancelled = false;
3366    private V result = null;
3367
3368    private final HBaseAdmin admin;
3369    protected final Long procId;
3370
3371    public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
3372      this.admin = admin;
3373      this.procId = procId;
3374    }
3375
3376    @Override
3377    public boolean cancel(boolean mayInterruptIfRunning) {
3378      AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder()
3379          .setProcId(procId).setMayInterruptIfRunning(mayInterruptIfRunning).build();
3380      try {
3381        cancelled = abortProcedureResult(abortProcRequest).getIsProcedureAborted();
3382        if (cancelled) {
3383          done = true;
3384        }
3385      } catch (IOException e) {
3386        // Cancell thrown exception for some reason. At this time, we are not sure whether
3387        // the cancell succeeds or fails. We assume that it is failed, but print out a warning
3388        // for debugging purpose.
3389        LOG.warn(
3390          "Cancelling the procedure with procId=" + procId + " throws exception " + e.getMessage(),
3391          e);
3392        cancelled = false;
3393      }
3394      return cancelled;
3395    }
3396
3397    @Override
3398    public boolean isCancelled() {
3399      return cancelled;
3400    }
3401
3402    protected AbortProcedureResponse abortProcedureResult(
3403        final AbortProcedureRequest request) throws IOException {
3404      return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
3405          admin.getConnection(), admin.getRpcControllerFactory()) {
3406        @Override
3407        protected AbortProcedureResponse rpcCall() throws Exception {
3408          return master.abortProcedure(getRpcController(), request);
3409        }
3410      });
3411    }
3412
3413    @Override
3414    public V get() throws InterruptedException, ExecutionException {
3415      // TODO: should we ever spin forever?
3416      // fix HBASE-21715. TODO: If the function call get() without timeout limit is not allowed,
3417      // is it possible to compose instead of inheriting from the class Future for this class?
3418      try {
3419        return get(admin.getProcedureTimeout, TimeUnit.MILLISECONDS);
3420      } catch (TimeoutException e) {
3421        LOG.warn("Failed to get the procedure with procId=" + procId + " throws exception " + e
3422            .getMessage(), e);
3423        return null;
3424      }
3425    }
3426
3427    @Override
3428    public V get(long timeout, TimeUnit unit)
3429        throws InterruptedException, ExecutionException, TimeoutException {
3430      if (!done) {
3431        long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
3432        try {
3433          try {
3434            // if the master support procedures, try to wait the result
3435            if (procId != null) {
3436              result = waitProcedureResult(procId, deadlineTs);
3437            }
3438            // if we don't have a proc result, try the compatibility wait
3439            if (!procResultFound) {
3440              result = waitOperationResult(deadlineTs);
3441            }
3442            result = postOperationResult(result, deadlineTs);
3443            done = true;
3444          } catch (IOException e) {
3445            result = postOperationFailure(e, deadlineTs);
3446            done = true;
3447          }
3448        } catch (IOException e) {
3449          exception = new ExecutionException(e);
3450          done = true;
3451        }
3452      }
3453      if (exception != null) {
3454        throw exception;
3455      }
3456      return result;
3457    }
3458
3459    @Override
3460    public boolean isDone() {
3461      return done;
3462    }
3463
3464    protected HBaseAdmin getAdmin() {
3465      return admin;
3466    }
3467
3468    private V waitProcedureResult(long procId, long deadlineTs)
3469        throws IOException, TimeoutException, InterruptedException {
3470      GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
3471          .setProcId(procId)
3472          .build();
3473
3474      int tries = 0;
3475      IOException serviceEx = null;
3476      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3477        GetProcedureResultResponse response = null;
3478        try {
3479          // Try to fetch the result
3480          response = getProcedureResult(request);
3481        } catch (IOException e) {
3482          serviceEx = unwrapException(e);
3483
3484          // the master may be down
3485          LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
3486
3487          // Not much to do, if we have a DoNotRetryIOException
3488          if (serviceEx instanceof DoNotRetryIOException) {
3489            // TODO: looks like there is no way to unwrap this exception and get the proper
3490            // UnsupportedOperationException aside from looking at the message.
3491            // anyway, if we fail here we just failover to the compatibility side
3492            // and that is always a valid solution.
3493            LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx);
3494            procResultFound = false;
3495            return null;
3496          }
3497        }
3498
3499        // If the procedure is no longer running, we should have a result
3500        if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) {
3501          procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
3502          return convertResult(response);
3503        }
3504
3505        try {
3506          Thread.sleep(getAdmin().getPauseTime(tries++));
3507        } catch (InterruptedException e) {
3508          throw new InterruptedException(
3509            "Interrupted while waiting for the result of proc " + procId);
3510        }
3511      }
3512      if (serviceEx != null) {
3513        throw serviceEx;
3514      } else {
3515        throw new TimeoutException("The procedure " + procId + " is still running");
3516      }
3517    }
3518
3519    private static IOException unwrapException(IOException e) {
3520      if (e instanceof RemoteException) {
3521        return ((RemoteException)e).unwrapRemoteException();
3522      }
3523      return e;
3524    }
3525
3526    protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
3527        throws IOException {
3528      return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
3529          admin.getConnection(), admin.getRpcControllerFactory()) {
3530        @Override
3531        protected GetProcedureResultResponse rpcCall() throws Exception {
3532          return master.getProcedureResult(getRpcController(), request);
3533        }
3534      });
3535    }
3536
3537    /**
3538     * Convert the procedure result response to a specified type.
3539     * @param response the procedure result object to parse
3540     * @return the result data of the procedure.
3541     */
3542    protected V convertResult(final GetProcedureResultResponse response) throws IOException {
3543      if (response.hasException()) {
3544        throw ForeignExceptionUtil.toIOException(response.getException());
3545      }
3546      return null;
3547    }
3548
3549    /**
3550     * Fallback implementation in case the procedure is not supported by the server.
3551     * It should try to wait until the operation is completed.
3552     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3553     * @return the result data of the operation
3554     */
3555    protected V waitOperationResult(final long deadlineTs)
3556        throws IOException, TimeoutException {
3557      return null;
3558    }
3559
3560    /**
3561     * Called after the operation is completed and the result fetched. this allows to perform extra
3562     * steps after the procedure is completed. it allows to apply transformations to the result that
3563     * will be returned by get().
3564     * @param result the result of the procedure
3565     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3566     * @return the result of the procedure, which may be the same as the passed one
3567     */
3568    protected V postOperationResult(final V result, final long deadlineTs)
3569        throws IOException, TimeoutException {
3570      return result;
3571    }
3572
3573    /**
3574     * Called after the operation is terminated with a failure.
3575     * this allows to perform extra steps after the procedure is terminated.
3576     * it allows to apply transformations to the result that will be returned by get().
3577     * The default implementation will rethrow the exception
3578     * @param exception the exception got from fetching the result
3579     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3580     * @return the result of the procedure, which may be the same as the passed one
3581     */
3582    protected V postOperationFailure(final IOException exception, final long deadlineTs)
3583        throws IOException, TimeoutException {
3584      throw exception;
3585    }
3586
3587    protected interface WaitForStateCallable {
3588      boolean checkState(int tries) throws IOException;
3589      void throwInterruptedException() throws InterruptedIOException;
3590      void throwTimeoutException(long elapsed) throws TimeoutException;
3591    }
3592
3593    protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
3594        throws IOException, TimeoutException {
3595      int tries = 0;
3596      IOException serverEx = null;
3597      long startTime = EnvironmentEdgeManager.currentTime();
3598      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3599        serverEx = null;
3600        try {
3601          if (callable.checkState(tries)) {
3602            return;
3603          }
3604        } catch (IOException e) {
3605          serverEx = e;
3606        }
3607        try {
3608          Thread.sleep(getAdmin().getPauseTime(tries++));
3609        } catch (InterruptedException e) {
3610          callable.throwInterruptedException();
3611        }
3612      }
3613      if (serverEx != null) {
3614        throw unwrapException(serverEx);
3615      } else {
3616        callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
3617      }
3618    }
3619  }
3620
3621  @InterfaceAudience.Private
3622  @InterfaceStability.Evolving
3623  protected static abstract class TableFuture<V> extends ProcedureFuture<V> {
3624    private final TableName tableName;
3625
3626    public TableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
3627      super(admin, procId);
3628      this.tableName = tableName;
3629    }
3630
3631    @Override
3632    public String toString() {
3633      return getDescription();
3634    }
3635
3636    /**
3637     * @return the table name
3638     */
3639    protected TableName getTableName() {
3640      return tableName;
3641    }
3642
3643    /**
3644     * @return the table descriptor
3645     */
3646    protected TableDescriptor getTableDescriptor() throws IOException {
3647      return getAdmin().getDescriptor(getTableName());
3648    }
3649
3650    /**
3651     * @return the operation type like CREATE, DELETE, DISABLE etc.
3652     */
3653    public abstract String getOperationType();
3654
3655    /**
3656     * @return a description of the operation
3657     */
3658    protected String getDescription() {
3659      return "Operation: " + getOperationType() + ", " + "Table Name: " +
3660        tableName.getNameWithNamespaceInclAsString() + ", procId: " + procId;
3661    }
3662
3663    protected abstract class TableWaitForStateCallable implements WaitForStateCallable {
3664      @Override
3665      public void throwInterruptedException() throws InterruptedIOException {
3666        throw new InterruptedIOException("Interrupted while waiting for " + getDescription());
3667      }
3668
3669      @Override
3670      public void throwTimeoutException(long elapsedTime) throws TimeoutException {
3671        throw new TimeoutException(
3672          getDescription() + " has not completed after " + elapsedTime + "ms");
3673      }
3674    }
3675
3676    @Override
3677    protected V postOperationResult(final V result, final long deadlineTs)
3678        throws IOException, TimeoutException {
3679      LOG.info(getDescription() + " completed");
3680      return super.postOperationResult(result, deadlineTs);
3681    }
3682
3683    @Override
3684    protected V postOperationFailure(final IOException exception, final long deadlineTs)
3685        throws IOException, TimeoutException {
3686      LOG.info(getDescription() + " failed with " + exception.getMessage());
3687      return super.postOperationFailure(exception, deadlineTs);
3688    }
3689
3690    protected void waitForTableEnabled(final long deadlineTs)
3691        throws IOException, TimeoutException {
3692      waitForState(deadlineTs, new TableWaitForStateCallable() {
3693        @Override
3694        public boolean checkState(int tries) throws IOException {
3695          try {
3696            if (getAdmin().isTableAvailable(tableName)) {
3697              return true;
3698            }
3699          } catch (TableNotFoundException tnfe) {
3700            LOG.debug("Table " + tableName.getNameWithNamespaceInclAsString()
3701                + " was not enabled, sleeping. tries=" + tries);
3702          }
3703          return false;
3704        }
3705      });
3706    }
3707
3708    protected void waitForTableDisabled(final long deadlineTs)
3709        throws IOException, TimeoutException {
3710      waitForState(deadlineTs, new TableWaitForStateCallable() {
3711        @Override
3712        public boolean checkState(int tries) throws IOException {
3713          return getAdmin().isTableDisabled(tableName);
3714        }
3715      });
3716    }
3717
3718    protected void waitTableNotFound(final long deadlineTs)
3719        throws IOException, TimeoutException {
3720      waitForState(deadlineTs, new TableWaitForStateCallable() {
3721        @Override
3722        public boolean checkState(int tries) throws IOException {
3723          return !getAdmin().tableExists(tableName);
3724        }
3725      });
3726    }
3727
3728    protected void waitForSchemaUpdate(final long deadlineTs)
3729        throws IOException, TimeoutException {
3730      waitForState(deadlineTs, new TableWaitForStateCallable() {
3731        @Override
3732        public boolean checkState(int tries) throws IOException {
3733          return getAdmin().getAlterStatus(tableName).getFirst() == 0;
3734        }
3735      });
3736    }
3737
3738    protected void waitForAllRegionsOnline(final long deadlineTs, final byte[][] splitKeys)
3739        throws IOException, TimeoutException {
3740      final TableDescriptor desc = getTableDescriptor();
3741      final AtomicInteger actualRegCount = new AtomicInteger(0);
3742      final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3743        @Override
3744        public boolean visit(Result rowResult) throws IOException {
3745          RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
3746          if (list == null) {
3747            LOG.warn("No serialized HRegionInfo in " + rowResult);
3748            return true;
3749          }
3750          HRegionLocation l = list.getRegionLocation();
3751          if (l == null) {
3752            return true;
3753          }
3754          if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
3755            return false;
3756          }
3757          if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
3758          HRegionLocation[] locations = list.getRegionLocations();
3759          for (HRegionLocation location : locations) {
3760            if (location == null) continue;
3761            ServerName serverName = location.getServerName();
3762            // Make sure that regions are assigned to server
3763            if (serverName != null && serverName.getHostAndPort() != null) {
3764              actualRegCount.incrementAndGet();
3765            }
3766          }
3767          return true;
3768        }
3769      };
3770
3771      int tries = 0;
3772      int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
3773      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3774        actualRegCount.set(0);
3775        MetaTableAccessor.scanMetaForTableRegions(getAdmin().getConnection(), visitor,
3776          desc.getTableName());
3777        if (actualRegCount.get() == numRegs) {
3778          // all the regions are online
3779          return;
3780        }
3781
3782        try {
3783          Thread.sleep(getAdmin().getPauseTime(tries++));
3784        } catch (InterruptedException e) {
3785          throw new InterruptedIOException("Interrupted when opening" + " regions; "
3786              + actualRegCount.get() + " of " + numRegs + " regions processed so far");
3787        }
3788      }
3789      throw new TimeoutException("Only " + actualRegCount.get() + " of " + numRegs
3790          + " regions are online; retries exhausted.");
3791    }
3792  }
3793
3794  @InterfaceAudience.Private
3795  @InterfaceStability.Evolving
3796  protected static abstract class NamespaceFuture extends ProcedureFuture<Void> {
3797    private final String namespaceName;
3798
3799    public NamespaceFuture(final HBaseAdmin admin, final String namespaceName, final Long procId) {
3800      super(admin, procId);
3801      this.namespaceName = namespaceName;
3802    }
3803
3804    /**
3805     * @return the namespace name
3806     */
3807    protected String getNamespaceName() {
3808      return namespaceName;
3809    }
3810
3811    /**
3812     * @return the operation type like CREATE_NAMESPACE, DELETE_NAMESPACE, etc.
3813     */
3814    public abstract String getOperationType();
3815
3816    @Override
3817    public String toString() {
3818      return "Operation: " + getOperationType() + ", Namespace: " + getNamespaceName();
3819    }
3820  }
3821
3822  @InterfaceAudience.Private
3823  @InterfaceStability.Evolving
3824  private static class ReplicationFuture extends ProcedureFuture<Void> {
3825    private final String peerId;
3826    private final Supplier<String> getOperation;
3827
3828    public ReplicationFuture(HBaseAdmin admin, String peerId, Long procId,
3829        Supplier<String> getOperation) {
3830      super(admin, procId);
3831      this.peerId = peerId;
3832      this.getOperation = getOperation;
3833    }
3834
3835    @Override
3836    public String toString() {
3837      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
3838    }
3839  }
3840
3841  @Override
3842  public List<SecurityCapability> getSecurityCapabilities() throws IOException {
3843    try {
3844      return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection(),
3845          getRpcControllerFactory()) {
3846        @Override
3847        protected List<SecurityCapability> rpcCall() throws Exception {
3848          SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build();
3849          return ProtobufUtil.toSecurityCapabilityList(
3850            master.getSecurityCapabilities(getRpcController(), req).getCapabilitiesList());
3851        }
3852      });
3853    } catch (IOException e) {
3854      if (e instanceof RemoteException) {
3855        e = ((RemoteException)e).unwrapRemoteException();
3856      }
3857      throw e;
3858    }
3859  }
3860
3861  @Override
3862  public boolean splitSwitch(boolean enabled, boolean synchronous) throws IOException {
3863    return splitOrMergeSwitch(enabled, synchronous, MasterSwitchType.SPLIT);
3864  }
3865
3866  @Override
3867  public boolean mergeSwitch(boolean enabled, boolean synchronous) throws IOException {
3868    return splitOrMergeSwitch(enabled, synchronous, MasterSwitchType.MERGE);
3869  }
3870
3871  private boolean splitOrMergeSwitch(boolean enabled, boolean synchronous,
3872      MasterSwitchType switchType) throws IOException {
3873    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3874      @Override
3875      protected Boolean rpcCall() throws Exception {
3876        MasterProtos.SetSplitOrMergeEnabledResponse response = master.setSplitOrMergeEnabled(
3877          getRpcController(),
3878          RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType));
3879        return response.getPrevValueList().get(0);
3880      }
3881    });
3882  }
3883
3884  @Override
3885  public boolean isSplitEnabled() throws IOException {
3886    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3887      @Override
3888      protected Boolean rpcCall() throws Exception {
3889        return master.isSplitOrMergeEnabled(getRpcController(),
3890          RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
3891      }
3892    });
3893  }
3894
3895  @Override
3896  public boolean isMergeEnabled() throws IOException {
3897    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3898      @Override
3899      protected Boolean rpcCall() throws Exception {
3900        return master.isSplitOrMergeEnabled(getRpcController(),
3901          RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
3902      }
3903    });
3904  }
3905
3906  private RpcControllerFactory getRpcControllerFactory() {
3907    return this.rpcControllerFactory;
3908  }
3909
3910  @Override
3911  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
3912      throws IOException {
3913    get(addReplicationPeerAsync(peerId, peerConfig, enabled), this.syncWaitTimeout,
3914      TimeUnit.MILLISECONDS);
3915  }
3916
3917  @Override
3918  public Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig,
3919      boolean enabled) throws IOException {
3920    AddReplicationPeerResponse response = executeCallable(
3921      new MasterCallable<AddReplicationPeerResponse>(getConnection(), getRpcControllerFactory()) {
3922        @Override
3923        protected AddReplicationPeerResponse rpcCall() throws Exception {
3924          return master.addReplicationPeer(getRpcController(),
3925            RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
3926        }
3927      });
3928    return new ReplicationFuture(this, peerId, response.getProcId(), () -> "ADD_REPLICATION_PEER");
3929  }
3930
3931  @Override
3932  public void removeReplicationPeer(String peerId) throws IOException {
3933    get(removeReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
3934  }
3935
3936  @Override
3937  public Future<Void> removeReplicationPeerAsync(String peerId) throws IOException {
3938    RemoveReplicationPeerResponse response =
3939      executeCallable(new MasterCallable<RemoveReplicationPeerResponse>(getConnection(),
3940          getRpcControllerFactory()) {
3941        @Override
3942        protected RemoveReplicationPeerResponse rpcCall() throws Exception {
3943          return master.removeReplicationPeer(getRpcController(),
3944            RequestConverter.buildRemoveReplicationPeerRequest(peerId));
3945        }
3946      });
3947    return new ReplicationFuture(this, peerId, response.getProcId(),
3948      () -> "REMOVE_REPLICATION_PEER");
3949  }
3950
3951  @Override
3952  public void enableReplicationPeer(final String peerId) throws IOException {
3953    get(enableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
3954  }
3955
3956  @Override
3957  public Future<Void> enableReplicationPeerAsync(final String peerId) throws IOException {
3958    EnableReplicationPeerResponse response =
3959      executeCallable(new MasterCallable<EnableReplicationPeerResponse>(getConnection(),
3960          getRpcControllerFactory()) {
3961        @Override
3962        protected EnableReplicationPeerResponse rpcCall() throws Exception {
3963          return master.enableReplicationPeer(getRpcController(),
3964            RequestConverter.buildEnableReplicationPeerRequest(peerId));
3965        }
3966      });
3967    return new ReplicationFuture(this, peerId, response.getProcId(),
3968      () -> "ENABLE_REPLICATION_PEER");
3969  }
3970
3971  @Override
3972  public void disableReplicationPeer(final String peerId) throws IOException {
3973    get(disableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
3974  }
3975
3976  @Override
3977  public Future<Void> disableReplicationPeerAsync(final String peerId) throws IOException {
3978    DisableReplicationPeerResponse response =
3979      executeCallable(new MasterCallable<DisableReplicationPeerResponse>(getConnection(),
3980          getRpcControllerFactory()) {
3981        @Override
3982        protected DisableReplicationPeerResponse rpcCall() throws Exception {
3983          return master.disableReplicationPeer(getRpcController(),
3984            RequestConverter.buildDisableReplicationPeerRequest(peerId));
3985        }
3986      });
3987    return new ReplicationFuture(this, peerId, response.getProcId(),
3988      () -> "DISABLE_REPLICATION_PEER");
3989  }
3990
3991  @Override
3992  public ReplicationPeerConfig getReplicationPeerConfig(final String peerId) throws IOException {
3993    return executeCallable(new MasterCallable<ReplicationPeerConfig>(getConnection(),
3994        getRpcControllerFactory()) {
3995      @Override
3996      protected ReplicationPeerConfig rpcCall() throws Exception {
3997        GetReplicationPeerConfigResponse response = master.getReplicationPeerConfig(
3998          getRpcController(), RequestConverter.buildGetReplicationPeerConfigRequest(peerId));
3999        return ReplicationPeerConfigUtil.convert(response.getPeerConfig());
4000      }
4001    });
4002  }
4003
4004  @Override
4005  public void updateReplicationPeerConfig(final String peerId,
4006      final ReplicationPeerConfig peerConfig) throws IOException {
4007    get(updateReplicationPeerConfigAsync(peerId, peerConfig), this.syncWaitTimeout,
4008      TimeUnit.MILLISECONDS);
4009  }
4010
4011  @Override
4012  public Future<Void> updateReplicationPeerConfigAsync(final String peerId,
4013      final ReplicationPeerConfig peerConfig) throws IOException {
4014    UpdateReplicationPeerConfigResponse response =
4015      executeCallable(new MasterCallable<UpdateReplicationPeerConfigResponse>(getConnection(),
4016          getRpcControllerFactory()) {
4017        @Override
4018        protected UpdateReplicationPeerConfigResponse rpcCall() throws Exception {
4019          return master.updateReplicationPeerConfig(getRpcController(),
4020            RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig));
4021        }
4022      });
4023    return new ReplicationFuture(this, peerId, response.getProcId(),
4024      () -> "UPDATE_REPLICATION_PEER_CONFIG");
4025  }
4026
4027  @Override
4028  public void appendReplicationPeerTableCFs(String id,
4029      Map<TableName, List<String>> tableCfs)
4030      throws ReplicationException, IOException {
4031    if (tableCfs == null) {
4032      throw new ReplicationException("tableCfs is null");
4033    }
4034    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
4035    ReplicationPeerConfig newPeerConfig =
4036        ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
4037    updateReplicationPeerConfig(id, newPeerConfig);
4038  }
4039
4040  @Override
4041  public void removeReplicationPeerTableCFs(String id,
4042      Map<TableName, List<String>> tableCfs)
4043      throws ReplicationException, IOException {
4044    if (tableCfs == null) {
4045      throw new ReplicationException("tableCfs is null");
4046    }
4047    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
4048    ReplicationPeerConfig newPeerConfig =
4049        ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
4050    updateReplicationPeerConfig(id, newPeerConfig);
4051  }
4052
4053  @Override
4054  public List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
4055    return listReplicationPeers((Pattern)null);
4056  }
4057
4058  @Override
4059  public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
4060      throws IOException {
4061    return executeCallable(new MasterCallable<List<ReplicationPeerDescription>>(getConnection(),
4062        getRpcControllerFactory()) {
4063      @Override
4064      protected List<ReplicationPeerDescription> rpcCall() throws Exception {
4065        List<ReplicationProtos.ReplicationPeerDescription> peersList = master.listReplicationPeers(
4066          getRpcController(), RequestConverter.buildListReplicationPeersRequest(pattern))
4067            .getPeerDescList();
4068        List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size());
4069        for (ReplicationProtos.ReplicationPeerDescription peer : peersList) {
4070          result.add(ReplicationPeerConfigUtil.toReplicationPeerDescription(peer));
4071        }
4072        return result;
4073      }
4074    });
4075  }
4076
4077  @Override
4078  public void decommissionRegionServers(List<ServerName> servers, boolean offload)
4079      throws IOException {
4080    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
4081      @Override
4082      public Void rpcCall() throws ServiceException {
4083        master.decommissionRegionServers(getRpcController(),
4084          RequestConverter.buildDecommissionRegionServersRequest(servers, offload));
4085        return null;
4086      }
4087    });
4088  }
4089
4090  @Override
4091  public List<ServerName> listDecommissionedRegionServers() throws IOException {
4092    return executeCallable(new MasterCallable<List<ServerName>>(getConnection(),
4093              getRpcControllerFactory()) {
4094      @Override
4095      public List<ServerName> rpcCall() throws ServiceException {
4096        ListDecommissionedRegionServersRequest req =
4097            ListDecommissionedRegionServersRequest.newBuilder().build();
4098        List<ServerName> servers = new ArrayList<>();
4099        for (HBaseProtos.ServerName server : master
4100            .listDecommissionedRegionServers(getRpcController(), req).getServerNameList()) {
4101          servers.add(ProtobufUtil.toServerName(server));
4102        }
4103        return servers;
4104      }
4105    });
4106  }
4107
4108  @Override
4109  public void recommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
4110      throws IOException {
4111    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
4112      @Override
4113      public Void rpcCall() throws ServiceException {
4114        master.recommissionRegionServer(getRpcController(),
4115          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames));
4116        return null;
4117      }
4118    });
4119  }
4120
4121  @Override
4122  public List<TableCFs> listReplicatedTableCFs() throws IOException {
4123    List<TableCFs> replicatedTableCFs = new ArrayList<>();
4124    List<TableDescriptor> tables = listTableDescriptors();
4125    tables.forEach(table -> {
4126      Map<String, Integer> cfs = new HashMap<>();
4127      Stream.of(table.getColumnFamilies())
4128          .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
4129          .forEach(column -> {
4130            cfs.put(column.getNameAsString(), column.getScope());
4131          });
4132      if (!cfs.isEmpty()) {
4133        replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
4134      }
4135    });
4136    return replicatedTableCFs;
4137  }
4138
4139  @Override
4140  public void enableTableReplication(final TableName tableName) throws IOException {
4141    if (tableName == null) {
4142      throw new IllegalArgumentException("Table name cannot be null");
4143    }
4144    if (!tableExists(tableName)) {
4145      throw new TableNotFoundException("Table '" + tableName.getNameAsString()
4146          + "' does not exists.");
4147    }
4148    byte[][] splits = getTableSplits(tableName);
4149    checkAndSyncTableDescToPeers(tableName, splits);
4150    setTableRep(tableName, true);
4151  }
4152
4153  @Override
4154  public void disableTableReplication(final TableName tableName) throws IOException {
4155    if (tableName == null) {
4156      throw new IllegalArgumentException("Table name is null");
4157    }
4158    if (!tableExists(tableName)) {
4159      throw new TableNotFoundException("Table '" + tableName.getNameAsString()
4160          + "' does not exists.");
4161    }
4162    setTableRep(tableName, false);
4163  }
4164
4165  /**
4166   * Connect to peer and check the table descriptor on peer:
4167   * <ol>
4168   * <li>Create the same table on peer when not exist.</li>
4169   * <li>Throw an exception if the table already has replication enabled on any of the column
4170   * families.</li>
4171   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
4172   * </ol>
4173   * @param tableName name of the table to sync to the peer
4174   * @param splits table split keys
4175   * @throws IOException
4176   */
4177  private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
4178      throws IOException {
4179    List<ReplicationPeerDescription> peers = listReplicationPeers();
4180    if (peers == null || peers.size() <= 0) {
4181      throw new IllegalArgumentException("Found no peer cluster for replication.");
4182    }
4183
4184    for (ReplicationPeerDescription peerDesc : peers) {
4185      if (peerDesc.getPeerConfig().needToReplicate(tableName)) {
4186        Configuration peerConf =
4187            ReplicationPeerConfigUtil.getPeerClusterConfiguration(this.conf, peerDesc);
4188        try (Connection conn = ConnectionFactory.createConnection(peerConf);
4189            Admin repHBaseAdmin = conn.getAdmin()) {
4190          TableDescriptor tableDesc = getDescriptor(tableName);
4191          TableDescriptor peerTableDesc = null;
4192          if (!repHBaseAdmin.tableExists(tableName)) {
4193            repHBaseAdmin.createTable(tableDesc, splits);
4194          } else {
4195            peerTableDesc = repHBaseAdmin.getDescriptor(tableName);
4196            if (peerTableDesc == null) {
4197              throw new IllegalArgumentException("Failed to get table descriptor for table "
4198                  + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId());
4199            }
4200            if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc,
4201              tableDesc) != 0) {
4202              throw new IllegalArgumentException("Table " + tableName.getNameAsString()
4203                  + " exists in peer cluster " + peerDesc.getPeerId()
4204                  + ", but the table descriptors are not same when compared with source cluster."
4205                  + " Thus can not enable the table's replication switch.");
4206            }
4207          }
4208        }
4209      }
4210    }
4211  }
4212
4213  /**
4214   * Set the table's replication switch if the table's replication switch is already not set.
4215   * @param tableName name of the table
4216   * @param enableRep is replication switch enable or disable
4217   * @throws IOException if a remote or network exception occurs
4218   */
4219  private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
4220    TableDescriptor tableDesc = getDescriptor(tableName);
4221    if (!tableDesc.matchReplicationScope(enableRep)) {
4222      int scope =
4223          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
4224      modifyTable(TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build());
4225    }
4226  }
4227
4228  @Override
4229  public void clearCompactionQueues(final ServerName sn, final Set<String> queues)
4230    throws IOException, InterruptedException {
4231    if (queues == null || queues.size() == 0) {
4232      throw new IllegalArgumentException("queues cannot be null or empty");
4233    }
4234    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
4235    Callable<Void> callable = new Callable<Void>() {
4236      @Override
4237      public Void call() throws Exception {
4238        // TODO: There is no timeout on this controller. Set one!
4239        HBaseRpcController controller = rpcControllerFactory.newController();
4240        ClearCompactionQueuesRequest request =
4241                RequestConverter.buildClearCompactionQueuesRequest(queues);
4242        admin.clearCompactionQueues(controller, request);
4243        return null;
4244      }
4245    };
4246    ProtobufUtil.call(callable);
4247  }
4248
4249  @Override
4250  public List<ServerName> clearDeadServers(List<ServerName> servers) throws IOException {
4251    return executeCallable(new MasterCallable<List<ServerName>>(getConnection(),
4252            getRpcControllerFactory()) {
4253      @Override
4254      protected List<ServerName> rpcCall() throws Exception {
4255        ClearDeadServersRequest req = RequestConverter.
4256          buildClearDeadServersRequest(servers == null? Collections.EMPTY_LIST: servers);
4257        return ProtobufUtil.toServerNameList(
4258                master.clearDeadServers(getRpcController(), req).getServerNameList());
4259      }
4260    });
4261  }
4262
4263  @Override
4264  public void cloneTableSchema(final TableName tableName, final TableName newTableName,
4265      final boolean preserveSplits) throws IOException {
4266    checkTableExists(tableName);
4267    if (tableExists(newTableName)) {
4268      throw new TableExistsException(newTableName);
4269    }
4270    TableDescriptor htd = TableDescriptorBuilder.copy(newTableName, getTableDescriptor(tableName));
4271    if (preserveSplits) {
4272      createTable(htd, getTableSplits(tableName));
4273    } else {
4274      createTable(htd);
4275    }
4276  }
4277}