001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import com.google.protobuf.Descriptors;
021import com.google.protobuf.Message;
022import com.google.protobuf.RpcController;
023import java.io.Closeable;
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.EnumSet;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.Callable;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.Future;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.TimeoutException;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.concurrent.atomic.AtomicReference;
043import java.util.function.Supplier;
044import java.util.regex.Pattern;
045import java.util.stream.Collectors;
046import java.util.stream.Stream;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.hbase.CacheEvictionStats;
049import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
050import org.apache.hadoop.hbase.ClusterMetrics;
051import org.apache.hadoop.hbase.ClusterMetrics.Option;
052import org.apache.hadoop.hbase.ClusterMetricsBuilder;
053import org.apache.hadoop.hbase.DoNotRetryIOException;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.MasterNotRunningException;
058import org.apache.hadoop.hbase.MetaTableAccessor;
059import org.apache.hadoop.hbase.NamespaceDescriptor;
060import org.apache.hadoop.hbase.NamespaceNotFoundException;
061import org.apache.hadoop.hbase.NotServingRegionException;
062import org.apache.hadoop.hbase.RegionLocations;
063import org.apache.hadoop.hbase.RegionMetrics;
064import org.apache.hadoop.hbase.RegionMetricsBuilder;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableExistsException;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.TableNotDisabledException;
069import org.apache.hadoop.hbase.TableNotFoundException;
070import org.apache.hadoop.hbase.UnknownRegionException;
071import org.apache.hadoop.hbase.ZooKeeperConnectionException;
072import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
073import org.apache.hadoop.hbase.client.replication.TableCFs;
074import org.apache.hadoop.hbase.client.security.SecurityCapability;
075import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
076import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
077import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
078import org.apache.hadoop.hbase.ipc.HBaseRpcController;
079import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
080import org.apache.hadoop.hbase.quotas.QuotaFilter;
081import org.apache.hadoop.hbase.quotas.QuotaRetriever;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
084import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
085import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
086import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
087import org.apache.hadoop.hbase.replication.SyncReplicationState;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
096import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
097import org.apache.hadoop.hbase.util.Addressing;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.ipc.RemoteException;
103import org.apache.hadoop.util.StringUtils;
104import org.apache.yetus.audience.InterfaceAudience;
105import org.apache.yetus.audience.InterfaceStability;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
110import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
111import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
112
113import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
114import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
227
228/**
229 * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
230 * this is an HBase-internal class as defined in
231 * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html
232 * There are no guarantees for backwards source / binary compatibility and methods or class can
233 * change or go away without deprecation.
234 * Use {@link Connection#getAdmin()} to obtain an instance of {@link Admin} instead of constructing
235 * an HBaseAdmin directly.
236 *
237 * <p>Connection should be an <i>unmanaged</i> connection obtained via
238 * {@link ConnectionFactory#createConnection(Configuration)}
239 *
240 * @see ConnectionFactory
241 * @see Connection
242 * @see Admin
243 */
244@InterfaceAudience.Private
245public class HBaseAdmin implements Admin {
246  private static final Logger LOG = LoggerFactory.getLogger(HBaseAdmin.class);
247
248  private ClusterConnection connection;
249
250  private final Configuration conf;
251  private final long pause;
252  private final int numRetries;
253  private final int syncWaitTimeout;
254  private boolean aborted;
255  private int operationTimeout;
256  private int rpcTimeout;
257  private int getProcedureTimeout;
258
259  private RpcRetryingCallerFactory rpcCallerFactory;
260  private RpcControllerFactory rpcControllerFactory;
261
262  private NonceGenerator ng;
263
264  @Override
265  public int getOperationTimeout() {
266    return operationTimeout;
267  }
268
269  @Override
270  public int getSyncWaitTimeout() {
271    return syncWaitTimeout;
272  }
273
274  HBaseAdmin(ClusterConnection connection) throws IOException {
275    this.conf = connection.getConfiguration();
276    this.connection = connection;
277
278    // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time.
279    this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
280        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
281    this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
282        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
283    this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
284        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
285    this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
286        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
287    this.syncWaitTimeout = this.conf.getInt(
288      "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
289    this.getProcedureTimeout =
290        this.conf.getInt("hbase.client.procedure.future.get.timeout.msec", 10 * 60000); // 10min
291
292    this.rpcCallerFactory = connection.getRpcRetryingCallerFactory();
293    this.rpcControllerFactory = connection.getRpcControllerFactory();
294
295    this.ng = this.connection.getNonceGenerator();
296  }
297
298  @Override
299  public void abort(String why, Throwable e) {
300    // Currently does nothing but throw the passed message and exception
301    this.aborted = true;
302    throw new RuntimeException(why, e);
303  }
304
305  @Override
306  public boolean isAborted() {
307    return this.aborted;
308  }
309
310  @Override
311  public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
312  throws IOException {
313    return get(abortProcedureAsync(procId, mayInterruptIfRunning), this.syncWaitTimeout,
314      TimeUnit.MILLISECONDS);
315  }
316
317  @Override
318  public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning)
319      throws IOException {
320    Boolean abortProcResponse =
321        executeCallable(new MasterCallable<AbortProcedureResponse>(getConnection(),
322            getRpcControllerFactory()) {
323      @Override
324      protected AbortProcedureResponse rpcCall() throws Exception {
325        AbortProcedureRequest abortProcRequest =
326            AbortProcedureRequest.newBuilder().setProcId(procId).build();
327        return master.abortProcedure(getRpcController(), abortProcRequest);
328      }
329    }).getIsProcedureAborted();
330    return new AbortProcedureFuture(this, procId, abortProcResponse);
331  }
332
333  @Override
334  public List<TableDescriptor> listTableDescriptors() throws IOException {
335    return listTableDescriptors((Pattern)null, false);
336  }
337
338  @Override
339  public List<TableDescriptor> listTableDescriptors(Pattern pattern, boolean includeSysTables)
340      throws IOException {
341    return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(),
342        getRpcControllerFactory()) {
343      @Override
344      protected List<TableDescriptor> rpcCall() throws Exception {
345        GetTableDescriptorsRequest req =
346            RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
347        return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(),
348            req));
349      }
350    });
351  }
352
353  @Override
354  public TableDescriptor getDescriptor(TableName tableName)
355      throws TableNotFoundException, IOException {
356    return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
357       operationTimeout, rpcTimeout);
358  }
359
360  @Override
361  public Future<Void> modifyTableAsync(TableDescriptor td) throws IOException {
362    ModifyTableResponse response = executeCallable(
363      new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) {
364        Long nonceGroup = ng.getNonceGroup();
365        Long nonce = ng.newNonce();
366        @Override
367        protected ModifyTableResponse rpcCall() throws Exception {
368          setPriority(td.getTableName());
369          ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
370            td.getTableName(), td, nonceGroup, nonce);
371          return master.modifyTable(getRpcController(), request);
372        }
373      });
374    return new ModifyTableFuture(this, td.getTableName(), response);
375  }
376
377  @Override
378  public List<TableDescriptor> listTableDescriptorsByNamespace(byte[] name) throws IOException {
379    return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(),
380        getRpcControllerFactory()) {
381      @Override
382      protected List<TableDescriptor> rpcCall() throws Exception {
383        return master.listTableDescriptorsByNamespace(getRpcController(),
384                ListTableDescriptorsByNamespaceRequest.newBuilder()
385                  .setNamespaceName(Bytes.toString(name)).build())
386                .getTableSchemaList()
387                .stream()
388                .map(ProtobufUtil::toTableDescriptor)
389                .collect(Collectors.toList());
390      }
391    });
392  }
393
394  @Override
395  public List<TableDescriptor> listTableDescriptors(List<TableName> tableNames) throws IOException {
396    return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(),
397        getRpcControllerFactory()) {
398      @Override
399      protected List<TableDescriptor> rpcCall() throws Exception {
400        GetTableDescriptorsRequest req =
401            RequestConverter.buildGetTableDescriptorsRequest(tableNames);
402          return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(),
403              req));
404      }
405    });
406  }
407
408  @Override
409  public List<RegionInfo> getRegions(final ServerName sn) throws IOException {
410    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
411    // TODO: There is no timeout on this controller. Set one!
412    HBaseRpcController controller = rpcControllerFactory.newController();
413    return ProtobufUtil.getOnlineRegions(controller, admin);
414  }
415
416  @Override
417  public List<RegionInfo> getRegions(TableName tableName) throws IOException {
418    if (TableName.isMetaTableName(tableName)) {
419      return Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO);
420    } else {
421      return MetaTableAccessor.getTableRegions(connection, tableName, true);
422    }
423  }
424
425  private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
426    private boolean isAbortInProgress;
427
428    public AbortProcedureFuture(
429        final HBaseAdmin admin,
430        final Long procId,
431        final Boolean abortProcResponse) {
432      super(admin, procId);
433      this.isAbortInProgress = abortProcResponse;
434    }
435
436    @Override
437    public Boolean get(long timeout, TimeUnit unit)
438        throws InterruptedException, ExecutionException, TimeoutException {
439      if (!this.isAbortInProgress) {
440        return false;
441      }
442      super.get(timeout, unit);
443      return true;
444    }
445  }
446
447  /** @return Connection used by this object. */
448  @Override
449  public Connection getConnection() {
450    return connection;
451  }
452
453  @Override
454  public boolean tableExists(final TableName tableName) throws IOException {
455    return executeCallable(new RpcRetryingCallable<Boolean>() {
456      @Override
457      protected Boolean rpcCall(int callTimeout) throws Exception {
458        return MetaTableAccessor.tableExists(connection, tableName);
459      }
460    });
461  }
462
463  @Override
464  public TableName[] listTableNames() throws IOException {
465    return listTableNames((Pattern)null, false);
466  }
467
468  @Override
469  public TableName[] listTableNames(Pattern pattern) throws IOException {
470    return listTableNames(pattern, false);
471  }
472
473  @Override
474  public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
475      throws IOException {
476    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
477        getRpcControllerFactory()) {
478      @Override
479      protected TableName[] rpcCall() throws Exception {
480        GetTableNamesRequest req =
481            RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
482        return ProtobufUtil.getTableNameArray(master.getTableNames(getRpcController(), req)
483            .getTableNamesList());
484      }
485    });
486  }
487
488  static TableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
489      RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
490      int operationTimeout, int rpcTimeout) throws IOException {
491    if (tableName == null) return null;
492    TableDescriptor td =
493        executeCallable(new MasterCallable<TableDescriptor>(connection, rpcControllerFactory) {
494      @Override
495      protected TableDescriptor rpcCall() throws Exception {
496        GetTableDescriptorsRequest req =
497            RequestConverter.buildGetTableDescriptorsRequest(tableName);
498        GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
499        if (!htds.getTableSchemaList().isEmpty()) {
500          return ProtobufUtil.toTableDescriptor(htds.getTableSchemaList().get(0));
501        }
502        return null;
503      }
504    }, rpcCallerFactory, operationTimeout, rpcTimeout);
505    if (td != null) {
506      return td;
507    }
508    throw new TableNotFoundException(tableName.getNameAsString());
509  }
510
511  private long getPauseTime(int tries) {
512    int triesCount = tries;
513    if (triesCount >= HConstants.RETRY_BACKOFF.length) {
514      triesCount = HConstants.RETRY_BACKOFF.length - 1;
515    }
516    return this.pause * HConstants.RETRY_BACKOFF[triesCount];
517  }
518
519  @Override
520  public void createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions)
521      throws IOException {
522    if (numRegions < 3) {
523      throw new IllegalArgumentException("Must create at least three regions");
524    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
525      throw new IllegalArgumentException("Start key must be smaller than end key");
526    }
527    if (numRegions == 3) {
528      createTable(desc, new byte[][] { startKey, endKey });
529      return;
530    }
531    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
532    if (splitKeys == null || splitKeys.length != numRegions - 1) {
533      throw new IllegalArgumentException("Unable to split key range into enough regions");
534    }
535    createTable(desc, splitKeys);
536  }
537
538  @Override
539  public Future<Void> createTableAsync(final TableDescriptor desc, final byte[][] splitKeys)
540      throws IOException {
541    if (desc.getTableName() == null) {
542      throw new IllegalArgumentException("TableName cannot be null");
543    }
544    if (splitKeys != null && splitKeys.length > 0) {
545      Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
546      // Verify there are no duplicate split keys
547      byte[] lastKey = null;
548      for (byte[] splitKey : splitKeys) {
549        if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
550          throw new IllegalArgumentException(
551              "Empty split key must not be passed in the split keys.");
552        }
553        if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
554          throw new IllegalArgumentException("All split keys must be unique, " +
555            "found duplicate: " + Bytes.toStringBinary(splitKey) +
556            ", " + Bytes.toStringBinary(lastKey));
557        }
558        lastKey = splitKey;
559      }
560    }
561
562    CreateTableResponse response = executeCallable(
563      new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) {
564        Long nonceGroup = ng.getNonceGroup();
565        Long nonce = ng.newNonce();
566        @Override
567        protected CreateTableResponse rpcCall() throws Exception {
568          setPriority(desc.getTableName());
569          CreateTableRequest request = RequestConverter.buildCreateTableRequest(
570            desc, splitKeys, nonceGroup, nonce);
571          return master.createTable(getRpcController(), request);
572        }
573      });
574    return new CreateTableFuture(this, desc, splitKeys, response);
575  }
576
577  private static class CreateTableFuture extends TableFuture<Void> {
578    private final TableDescriptor desc;
579    private final byte[][] splitKeys;
580
581    public CreateTableFuture(final HBaseAdmin admin, final TableDescriptor desc,
582        final byte[][] splitKeys, final CreateTableResponse response) {
583      super(admin, desc.getTableName(),
584              (response != null && response.hasProcId()) ? response.getProcId() : null);
585      this.splitKeys = splitKeys;
586      this.desc = desc;
587    }
588
589    @Override
590    protected TableDescriptor getDescriptor() {
591      return desc;
592    }
593
594    @Override
595    public String getOperationType() {
596      return "CREATE";
597    }
598
599    @Override
600    protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
601      waitForTableEnabled(deadlineTs);
602      waitForAllRegionsOnline(deadlineTs, splitKeys);
603      return null;
604    }
605  }
606
607  @Override
608  public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
609    DeleteTableResponse response = executeCallable(
610      new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) {
611        Long nonceGroup = ng.getNonceGroup();
612        Long nonce = ng.newNonce();
613        @Override
614        protected DeleteTableResponse rpcCall() throws Exception {
615          setPriority(tableName);
616          DeleteTableRequest req =
617              RequestConverter.buildDeleteTableRequest(tableName, nonceGroup,nonce);
618          return master.deleteTable(getRpcController(), req);
619        }
620      });
621    return new DeleteTableFuture(this, tableName, response);
622  }
623
624  private static class DeleteTableFuture extends TableFuture<Void> {
625    public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
626        final DeleteTableResponse response) {
627      super(admin, tableName,
628              (response != null && response.hasProcId()) ? response.getProcId() : null);
629    }
630
631    @Override
632    public String getOperationType() {
633      return "DELETE";
634    }
635
636    @Override
637    protected Void waitOperationResult(final long deadlineTs)
638        throws IOException, TimeoutException {
639      waitTableNotFound(deadlineTs);
640      return null;
641    }
642
643    @Override
644    protected Void postOperationResult(final Void result, final long deadlineTs)
645        throws IOException, TimeoutException {
646      // Delete cached information to prevent clients from using old locations
647      ((ClusterConnection) getAdmin().getConnection()).clearRegionCache(getTableName());
648      return super.postOperationResult(result, deadlineTs);
649    }
650  }
651
652  @Override
653  public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
654      throws IOException {
655    TruncateTableResponse response =
656        executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(),
657            getRpcControllerFactory()) {
658          Long nonceGroup = ng.getNonceGroup();
659          Long nonce = ng.newNonce();
660          @Override
661          protected TruncateTableResponse rpcCall() throws Exception {
662            setPriority(tableName);
663            LOG.info("Started truncating " + tableName);
664            TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
665              tableName, preserveSplits, nonceGroup, nonce);
666            return master.truncateTable(getRpcController(), req);
667          }
668        });
669    return new TruncateTableFuture(this, tableName, preserveSplits, response);
670  }
671
672  private static class TruncateTableFuture extends TableFuture<Void> {
673    private final boolean preserveSplits;
674
675    public TruncateTableFuture(final HBaseAdmin admin, final TableName tableName,
676        final boolean preserveSplits, final TruncateTableResponse response) {
677      super(admin, tableName,
678             (response != null && response.hasProcId()) ? response.getProcId() : null);
679      this.preserveSplits = preserveSplits;
680    }
681
682    @Override
683    public String getOperationType() {
684      return "TRUNCATE";
685    }
686
687    @Override
688    protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
689      waitForTableEnabled(deadlineTs);
690      // once the table is enabled, we know the operation is done. so we can fetch the splitKeys
691      byte[][] splitKeys = preserveSplits ? getAdmin().getTableSplits(getTableName()) : null;
692      waitForAllRegionsOnline(deadlineTs, splitKeys);
693      return null;
694    }
695  }
696
697  private byte[][] getTableSplits(final TableName tableName) throws IOException {
698    byte[][] splits = null;
699    try (RegionLocator locator = getConnection().getRegionLocator(tableName)) {
700      byte[][] startKeys = locator.getStartKeys();
701      if (startKeys.length == 1) {
702        return splits;
703      }
704      splits = new byte[startKeys.length - 1][];
705      for (int i = 1; i < startKeys.length; i++) {
706        splits[i - 1] = startKeys[i];
707      }
708    }
709    return splits;
710  }
711
712  @Override
713  public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
714    TableName.isLegalFullyQualifiedTableName(tableName.getName());
715    EnableTableResponse response = executeCallable(
716      new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) {
717        Long nonceGroup = ng.getNonceGroup();
718        Long nonce = ng.newNonce();
719        @Override
720        protected EnableTableResponse rpcCall() throws Exception {
721          setPriority(tableName);
722          LOG.info("Started enable of " + tableName);
723          EnableTableRequest req =
724              RequestConverter.buildEnableTableRequest(tableName, nonceGroup, nonce);
725          return master.enableTable(getRpcController(),req);
726        }
727      });
728    return new EnableTableFuture(this, tableName, response);
729  }
730
731  private static class EnableTableFuture extends TableFuture<Void> {
732    public EnableTableFuture(final HBaseAdmin admin, final TableName tableName,
733        final EnableTableResponse response) {
734      super(admin, tableName,
735              (response != null && response.hasProcId()) ? response.getProcId() : null);
736    }
737
738    @Override
739    public String getOperationType() {
740      return "ENABLE";
741    }
742
743    @Override
744    protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
745      waitForTableEnabled(deadlineTs);
746      return null;
747    }
748  }
749
750  @Override
751  public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
752    TableName.isLegalFullyQualifiedTableName(tableName.getName());
753    DisableTableResponse response = executeCallable(
754      new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) {
755        Long nonceGroup = ng.getNonceGroup();
756        Long nonce = ng.newNonce();
757        @Override
758        protected DisableTableResponse rpcCall() throws Exception {
759          setPriority(tableName);
760          LOG.info("Started disable of " + tableName);
761          DisableTableRequest req =
762              RequestConverter.buildDisableTableRequest(
763                tableName, nonceGroup, nonce);
764          return master.disableTable(getRpcController(), req);
765        }
766      });
767    return new DisableTableFuture(this, tableName, response);
768  }
769
770  private static class DisableTableFuture extends TableFuture<Void> {
771    public DisableTableFuture(final HBaseAdmin admin, final TableName tableName,
772        final DisableTableResponse response) {
773      super(admin, tableName,
774              (response != null && response.hasProcId()) ? response.getProcId() : null);
775    }
776
777    @Override
778    public String getOperationType() {
779      return "DISABLE";
780    }
781
782    @Override
783    protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException {
784      waitForTableDisabled(deadlineTs);
785      return null;
786    }
787  }
788
789  @Override
790  public boolean isTableEnabled(final TableName tableName) throws IOException {
791    checkTableExists(tableName);
792    return executeCallable(new RpcRetryingCallable<Boolean>() {
793      @Override
794      protected Boolean rpcCall(int callTimeout) throws Exception {
795        TableState tableState = MetaTableAccessor.getTableState(getConnection(), tableName);
796        if (tableState == null) {
797          throw new TableNotFoundException(tableName);
798        }
799        return tableState.inStates(TableState.State.ENABLED);
800      }
801    });
802  }
803
804  @Override
805  public boolean isTableDisabled(TableName tableName) throws IOException {
806    checkTableExists(tableName);
807    return connection.isTableDisabled(tableName);
808  }
809
810  @Override
811  public boolean isTableAvailable(TableName tableName) throws IOException {
812    return connection.isTableAvailable(tableName, null);
813  }
814
815  @Override
816  public Future<Void> addColumnFamilyAsync(final TableName tableName,
817      final ColumnFamilyDescriptor columnFamily) throws IOException {
818    AddColumnResponse response =
819        executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
820            getRpcControllerFactory()) {
821          Long nonceGroup = ng.getNonceGroup();
822          Long nonce = ng.newNonce();
823          @Override
824          protected AddColumnResponse rpcCall() throws Exception {
825            setPriority(tableName);
826            AddColumnRequest req =
827                RequestConverter.buildAddColumnRequest(tableName, columnFamily, nonceGroup, nonce);
828            return master.addColumn(getRpcController(), req);
829          }
830        });
831    return new AddColumnFamilyFuture(this, tableName, response);
832  }
833
834  private static class AddColumnFamilyFuture extends ModifyTableFuture {
835    public AddColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
836        final AddColumnResponse response) {
837      super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
838          : null);
839    }
840
841    @Override
842    public String getOperationType() {
843      return "ADD_COLUMN_FAMILY";
844    }
845  }
846
847  @Override
848  public Future<Void> deleteColumnFamilyAsync(final TableName tableName, final byte[] columnFamily)
849      throws IOException {
850    DeleteColumnResponse response =
851        executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(),
852            getRpcControllerFactory()) {
853          Long nonceGroup = ng.getNonceGroup();
854          Long nonce = ng.newNonce();
855          @Override
856          protected DeleteColumnResponse rpcCall() throws Exception {
857            setPriority(tableName);
858            DeleteColumnRequest req =
859                RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
860                  nonceGroup, nonce);
861            return master.deleteColumn(getRpcController(), req);
862          }
863        });
864    return new DeleteColumnFamilyFuture(this, tableName, response);
865  }
866
867  private static class DeleteColumnFamilyFuture extends ModifyTableFuture {
868    public DeleteColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
869        final DeleteColumnResponse response) {
870      super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
871          : null);
872    }
873
874    @Override
875    public String getOperationType() {
876      return "DELETE_COLUMN_FAMILY";
877    }
878  }
879
880  @Override
881  public Future<Void> modifyColumnFamilyAsync(final TableName tableName,
882      final ColumnFamilyDescriptor columnFamily) throws IOException {
883    ModifyColumnResponse response =
884        executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(),
885            getRpcControllerFactory()) {
886          Long nonceGroup = ng.getNonceGroup();
887          Long nonce = ng.newNonce();
888          @Override
889          protected ModifyColumnResponse rpcCall() throws Exception {
890            setPriority(tableName);
891            ModifyColumnRequest req =
892                RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
893                  nonceGroup, nonce);
894            return master.modifyColumn(getRpcController(), req);
895          }
896        });
897    return new ModifyColumnFamilyFuture(this, tableName, response);
898  }
899
900  private static class ModifyColumnFamilyFuture extends ModifyTableFuture {
901    public ModifyColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
902        final ModifyColumnResponse response) {
903      super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
904          : null);
905    }
906
907    @Override
908    public String getOperationType() {
909      return "MODIFY_COLUMN_FAMILY";
910    }
911  }
912
913  @Override
914  public void flush(final TableName tableName) throws IOException {
915    checkTableExists(tableName);
916    if (isTableDisabled(tableName)) {
917      LOG.info("Table is disabled: " + tableName.getNameAsString());
918      return;
919    }
920    execProcedure("flush-table-proc", tableName.getNameAsString(), new HashMap<>());
921  }
922
923  @Override
924  public void flushRegion(final byte[] regionName) throws IOException {
925    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
926    if (regionServerPair == null) {
927      throw new IllegalArgumentException("Unknown regionname: " + Bytes.toStringBinary(regionName));
928    }
929    if (regionServerPair.getSecond() == null) {
930      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
931    }
932    final RegionInfo regionInfo = regionServerPair.getFirst();
933    ServerName serverName = regionServerPair.getSecond();
934    flush(this.connection.getAdmin(serverName), regionInfo);
935  }
936
937  private void flush(AdminService.BlockingInterface admin, final RegionInfo info)
938    throws IOException {
939    ProtobufUtil.call(() -> {
940      // TODO: There is no timeout on this controller. Set one!
941      HBaseRpcController controller = rpcControllerFactory.newController();
942      FlushRegionRequest request =
943        RequestConverter.buildFlushRegionRequest(info.getRegionName());
944      admin.flushRegion(controller, request);
945      return null;
946    });
947  }
948
949  @Override
950  public void flushRegionServer(ServerName serverName) throws IOException {
951    for (RegionInfo region : getRegions(serverName)) {
952      flush(this.connection.getAdmin(serverName), region);
953    }
954  }
955
956  /**
957   * {@inheritDoc}
958   */
959  @Override
960  public void compact(final TableName tableName)
961    throws IOException {
962    compact(tableName, null, false, CompactType.NORMAL);
963  }
964
965  @Override
966  public void compactRegion(final byte[] regionName)
967    throws IOException {
968    compactRegion(regionName, null, false);
969  }
970
971  /**
972   * {@inheritDoc}
973   */
974  @Override
975  public void compact(final TableName tableName, final byte[] columnFamily)
976    throws IOException {
977    compact(tableName, columnFamily, false, CompactType.NORMAL);
978  }
979
980  /**
981   * {@inheritDoc}
982   */
983  @Override
984  public void compactRegion(final byte[] regionName, final byte[] columnFamily)
985    throws IOException {
986    compactRegion(regionName, columnFamily, false);
987  }
988
989  @Override
990  public Map<ServerName, Boolean> compactionSwitch(boolean switchState, List<String>
991      serverNamesList) throws IOException {
992    List<ServerName> serverList = new ArrayList<>();
993    if (serverNamesList.isEmpty()) {
994      ClusterMetrics status = getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
995      serverList.addAll(status.getLiveServerMetrics().keySet());
996    } else {
997      for (String regionServerName: serverNamesList) {
998        ServerName serverName = null;
999        try {
1000          serverName = ServerName.valueOf(regionServerName);
1001        } catch (Exception e) {
1002          throw new IllegalArgumentException(String.format("Invalid ServerName format: %s",
1003              regionServerName));
1004        }
1005        if (serverName == null) {
1006          throw new IllegalArgumentException(String.format("Null ServerName: %s",
1007              regionServerName));
1008        }
1009        serverList.add(serverName);
1010      }
1011    }
1012    Map<ServerName, Boolean> res = new HashMap<>(serverList.size());
1013    for (ServerName serverName: serverList) {
1014      boolean prev_state = switchCompact(this.connection.getAdmin(serverName), switchState);
1015      res.put(serverName, prev_state);
1016    }
1017    return res;
1018  }
1019
1020  private Boolean switchCompact(AdminService.BlockingInterface admin, boolean onOrOff)
1021      throws IOException {
1022    return executeCallable(new RpcRetryingCallable<Boolean>() {
1023      @Override protected Boolean rpcCall(int callTimeout) throws Exception {
1024        HBaseRpcController controller = rpcControllerFactory.newController();
1025        CompactionSwitchRequest request =
1026            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build();
1027        CompactionSwitchResponse compactionSwitchResponse =
1028            admin.compactionSwitch(controller, request);
1029        return compactionSwitchResponse.getPrevState();
1030      }
1031    });
1032  }
1033
1034  @Override
1035  public void compactRegionServer(final ServerName serverName) throws IOException {
1036    for (RegionInfo region : getRegions(serverName)) {
1037      compact(this.connection.getAdmin(serverName), region, false, null);
1038    }
1039  }
1040
1041  @Override
1042  public void majorCompactRegionServer(final ServerName serverName) throws IOException {
1043    for (RegionInfo region : getRegions(serverName)) {
1044      compact(this.connection.getAdmin(serverName), region, true, null);
1045    }
1046  }
1047
1048  @Override
1049  public void majorCompact(final TableName tableName)
1050  throws IOException {
1051    compact(tableName, null, true, CompactType.NORMAL);
1052  }
1053
1054  @Override
1055  public void majorCompactRegion(final byte[] regionName)
1056  throws IOException {
1057    compactRegion(regionName, null, true);
1058  }
1059
1060  /**
1061   * {@inheritDoc}
1062   */
1063  @Override
1064  public void majorCompact(final TableName tableName, final byte[] columnFamily)
1065  throws IOException {
1066    compact(tableName, columnFamily, true, CompactType.NORMAL);
1067  }
1068
1069  @Override
1070  public void majorCompactRegion(final byte[] regionName, final byte[] columnFamily)
1071  throws IOException {
1072    compactRegion(regionName, columnFamily, true);
1073  }
1074
1075  /**
1076   * Compact a table.
1077   * Asynchronous operation.
1078   *
1079   * @param tableName table or region to compact
1080   * @param columnFamily column family within a table or region
1081   * @param major True if we are to do a major compaction.
1082   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
1083   * @throws IOException if a remote or network exception occurs
1084   */
1085  private void compact(final TableName tableName, final byte[] columnFamily,final boolean major,
1086                       CompactType compactType) throws IOException {
1087    switch (compactType) {
1088      case MOB:
1089        compact(this.connection.getAdminForMaster(), RegionInfo.createMobRegionInfo(tableName),
1090            major, columnFamily);
1091        break;
1092      case NORMAL:
1093        checkTableExists(tableName);
1094        for (HRegionLocation loc :connection.locateRegions(tableName, false, false)) {
1095          ServerName sn = loc.getServerName();
1096          if (sn == null) {
1097            continue;
1098          }
1099          try {
1100            compact(this.connection.getAdmin(sn), loc.getRegion(), major, columnFamily);
1101          } catch (NotServingRegionException e) {
1102            if (LOG.isDebugEnabled()) {
1103              LOG.debug("Trying to" + (major ? " major" : "") + " compact " + loc.getRegion() +
1104                  ": " + StringUtils.stringifyException(e));
1105            }
1106          }
1107        }
1108        break;
1109      default:
1110        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1111    }
1112  }
1113
1114  /**
1115   * Compact an individual region.
1116   * Asynchronous operation.
1117   *
1118   * @param regionName region to compact
1119   * @param columnFamily column family within a table or region
1120   * @param major True if we are to do a major compaction.
1121   * @throws IOException if a remote or network exception occurs
1122   * @throws InterruptedException
1123   */
1124  private void compactRegion(final byte[] regionName, final byte[] columnFamily,
1125      final boolean major) throws IOException {
1126    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
1127    if (regionServerPair == null) {
1128      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1129    }
1130    if (regionServerPair.getSecond() == null) {
1131      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1132    }
1133    compact(this.connection.getAdmin(regionServerPair.getSecond()), regionServerPair.getFirst(),
1134      major, columnFamily);
1135  }
1136
1137  private void compact(AdminService.BlockingInterface admin, RegionInfo hri, boolean major,
1138      byte[] family) throws IOException {
1139    Callable<Void> callable = new Callable<Void>() {
1140      @Override
1141      public Void call() throws Exception {
1142        // TODO: There is no timeout on this controller. Set one!
1143        HBaseRpcController controller = rpcControllerFactory.newController();
1144        CompactRegionRequest request =
1145            RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
1146        admin.compactRegion(controller, request);
1147        return null;
1148      }
1149    };
1150    ProtobufUtil.call(callable);
1151  }
1152
1153  @Override
1154  public void move(byte[] encodedRegionName) throws IOException {
1155    move(encodedRegionName, (ServerName) null);
1156  }
1157
1158  public void move(final byte[] encodedRegionName, ServerName destServerName) throws IOException {
1159    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1160      @Override
1161      protected Void rpcCall() throws Exception {
1162        setPriority(encodedRegionName);
1163        MoveRegionRequest request =
1164          RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
1165        master.moveRegion(getRpcController(), request);
1166        return null;
1167      }
1168    });
1169  }
1170
1171  @Override
1172  public void assign(final byte [] regionName) throws MasterNotRunningException,
1173      ZooKeeperConnectionException, IOException {
1174    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1175      @Override
1176      protected Void rpcCall() throws Exception {
1177        setPriority(regionName);
1178        AssignRegionRequest request =
1179            RequestConverter.buildAssignRegionRequest(getRegionName(regionName));
1180        master.assignRegion(getRpcController(), request);
1181        return null;
1182      }
1183    });
1184  }
1185
1186  @Override
1187  public void unassign(final byte [] regionName, final boolean force) throws IOException {
1188    final byte[] toBeUnassigned = getRegionName(regionName);
1189    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1190      @Override
1191      protected Void rpcCall() throws Exception {
1192        setPriority(regionName);
1193        UnassignRegionRequest request =
1194            RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
1195        master.unassignRegion(getRpcController(), request);
1196        return null;
1197      }
1198    });
1199  }
1200
1201  @Override
1202  public void offline(final byte [] regionName)
1203  throws IOException {
1204    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1205      @Override
1206      protected Void rpcCall() throws Exception {
1207        setPriority(regionName);
1208        master.offlineRegion(getRpcController(),
1209            RequestConverter.buildOfflineRegionRequest(regionName));
1210        return null;
1211      }
1212    });
1213  }
1214
1215  @Override
1216  public boolean balancerSwitch(final boolean on, final boolean synchronous)
1217  throws IOException {
1218    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1219      @Override
1220      protected Boolean rpcCall() throws Exception {
1221        SetBalancerRunningRequest req =
1222            RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
1223        return master.setBalancerRunning(getRpcController(), req).getPrevBalanceValue();
1224      }
1225    });
1226  }
1227
1228  @Override
1229  public boolean balance() throws IOException {
1230    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1231      @Override
1232      protected Boolean rpcCall() throws Exception {
1233        return master.balance(getRpcController(),
1234            RequestConverter.buildBalanceRequest(false)).getBalancerRan();
1235      }
1236    });
1237  }
1238
1239  @Override
1240  public boolean balance(final boolean force) throws IOException {
1241    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1242      @Override
1243      protected Boolean rpcCall() throws Exception {
1244        return master.balance(getRpcController(),
1245            RequestConverter.buildBalanceRequest(force)).getBalancerRan();
1246      }
1247    });
1248  }
1249
1250  @Override
1251  public boolean isBalancerEnabled() throws IOException {
1252    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1253      @Override
1254      protected Boolean rpcCall() throws Exception {
1255        return master.isBalancerEnabled(getRpcController(),
1256          RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
1257      }
1258    });
1259  }
1260
1261  /**
1262   * {@inheritDoc}
1263   */
1264  @Override
1265  public CacheEvictionStats clearBlockCache(final TableName tableName) throws IOException {
1266    checkTableExists(tableName);
1267    CacheEvictionStatsBuilder cacheEvictionStats = CacheEvictionStats.builder();
1268    List<Pair<RegionInfo, ServerName>> pairs =
1269      MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
1270    Map<ServerName, List<RegionInfo>> regionInfoByServerName =
1271        pairs.stream()
1272            .filter(pair -> !(pair.getFirst().isOffline()))
1273            .filter(pair -> pair.getSecond() != null)
1274            .collect(Collectors.groupingBy(pair -> pair.getSecond(),
1275                Collectors.mapping(pair -> pair.getFirst(), Collectors.toList())));
1276
1277    for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
1278      CacheEvictionStats stats = clearBlockCache(entry.getKey(), entry.getValue());
1279      cacheEvictionStats = cacheEvictionStats.append(stats);
1280      if (stats.getExceptionCount() > 0) {
1281        for (Map.Entry<byte[], Throwable> exception : stats.getExceptions().entrySet()) {
1282          LOG.debug("Failed to clear block cache for "
1283              + Bytes.toStringBinary(exception.getKey())
1284              + " on " + entry.getKey() + ": ", exception.getValue());
1285        }
1286      }
1287    }
1288    return cacheEvictionStats.build();
1289  }
1290
1291  private CacheEvictionStats clearBlockCache(final ServerName sn, final List<RegionInfo> hris)
1292      throws IOException {
1293    HBaseRpcController controller = rpcControllerFactory.newController();
1294    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1295    ClearRegionBlockCacheRequest request =
1296      RequestConverter.buildClearRegionBlockCacheRequest(hris);
1297    ClearRegionBlockCacheResponse response;
1298    try {
1299      response = admin.clearRegionBlockCache(controller, request);
1300      return ProtobufUtil.toCacheEvictionStats(response.getStats());
1301    } catch (ServiceException se) {
1302      throw ProtobufUtil.getRemoteException(se);
1303    }
1304  }
1305
1306  /**
1307   * Invoke region normalizer. Can NOT run for various reasons.  Check logs.
1308   *
1309   * @return True if region normalizer ran, false otherwise.
1310   */
1311  @Override
1312  public boolean normalize() throws IOException {
1313    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1314      @Override
1315      protected Boolean rpcCall() throws Exception {
1316        return master.normalize(getRpcController(),
1317            RequestConverter.buildNormalizeRequest()).getNormalizerRan();
1318      }
1319    });
1320  }
1321
1322  @Override
1323  public boolean isNormalizerEnabled() throws IOException {
1324    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1325      @Override
1326      protected Boolean rpcCall() throws Exception {
1327        return master.isNormalizerEnabled(getRpcController(),
1328          RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
1329      }
1330    });
1331  }
1332
1333  @Override
1334  public boolean normalizerSwitch(final boolean on) throws IOException {
1335    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1336      @Override
1337      protected Boolean rpcCall() throws Exception {
1338        SetNormalizerRunningRequest req =
1339          RequestConverter.buildSetNormalizerRunningRequest(on);
1340        return master.setNormalizerRunning(getRpcController(), req).getPrevNormalizerValue();
1341      }
1342    });
1343  }
1344
1345  @Override
1346  public boolean catalogJanitorSwitch(final boolean enable) throws IOException {
1347    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1348      @Override
1349      protected Boolean rpcCall() throws Exception {
1350        return master.enableCatalogJanitor(getRpcController(),
1351          RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
1352      }
1353    });
1354  }
1355
1356  @Override
1357  public int runCatalogJanitor() throws IOException {
1358    return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) {
1359      @Override
1360      protected Integer rpcCall() throws Exception {
1361        return master.runCatalogScan(getRpcController(),
1362          RequestConverter.buildCatalogScanRequest()).getScanResult();
1363      }
1364    });
1365  }
1366
1367  @Override
1368  public boolean isCatalogJanitorEnabled() throws IOException {
1369    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1370      @Override
1371      protected Boolean rpcCall() throws Exception {
1372        return master.isCatalogJanitorEnabled(getRpcController(),
1373          RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
1374      }
1375    });
1376  }
1377
1378  @Override
1379  public boolean cleanerChoreSwitch(final boolean on) throws IOException {
1380    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1381      @Override public Boolean rpcCall() throws Exception {
1382        return master.setCleanerChoreRunning(getRpcController(),
1383            RequestConverter.buildSetCleanerChoreRunningRequest(on)).getPrevValue();
1384      }
1385    });
1386  }
1387
1388  @Override
1389  public boolean runCleanerChore() throws IOException {
1390    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1391      @Override public Boolean rpcCall() throws Exception {
1392        return master.runCleanerChore(getRpcController(),
1393            RequestConverter.buildRunCleanerChoreRequest()).getCleanerChoreRan();
1394      }
1395    });
1396  }
1397
1398  @Override
1399  public boolean isCleanerChoreEnabled() throws IOException {
1400    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1401      @Override public Boolean rpcCall() throws Exception {
1402        return master.isCleanerChoreEnabled(getRpcController(),
1403            RequestConverter.buildIsCleanerChoreEnabledRequest()).getValue();
1404      }
1405    });
1406  }
1407
1408  /**
1409   * Merge two regions. Synchronous operation.
1410   * Note: It is not feasible to predict the length of merge.
1411   *   Therefore, this is for internal testing only.
1412   * @param nameOfRegionA encoded or full name of region a
1413   * @param nameOfRegionB encoded or full name of region b
1414   * @param forcible true if do a compulsory merge, otherwise we will only merge
1415   *          two adjacent regions
1416   * @throws IOException if a remote or network exception occurs
1417   */
1418  @VisibleForTesting
1419  public void mergeRegionsSync(
1420      final byte[] nameOfRegionA,
1421      final byte[] nameOfRegionB,
1422      final boolean forcible) throws IOException {
1423    get(
1424      mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible),
1425      syncWaitTimeout,
1426      TimeUnit.MILLISECONDS);
1427  }
1428
1429  /**
1430   * Merge two regions. Asynchronous operation.
1431   * @param nameofRegionsToMerge encoded or full name of daughter regions
1432   * @param forcible true if do a compulsory merge, otherwise we will only merge
1433   *          adjacent regions
1434   */
1435  @Override
1436  public Future<Void> mergeRegionsAsync(final byte[][] nameofRegionsToMerge, final boolean forcible)
1437      throws IOException {
1438    Preconditions.checkArgument(nameofRegionsToMerge.length >= 2, "Can not merge only %s region",
1439      nameofRegionsToMerge.length);
1440    byte[][] encodedNameofRegionsToMerge = new byte[nameofRegionsToMerge.length][];
1441    for (int i = 0; i < nameofRegionsToMerge.length; i++) {
1442      encodedNameofRegionsToMerge[i] =
1443        RegionInfo.isEncodedRegionName(nameofRegionsToMerge[i]) ? nameofRegionsToMerge[i]
1444          : Bytes.toBytes(RegionInfo.encodeRegionName(nameofRegionsToMerge[i]));
1445    }
1446
1447    TableName tableName = null;
1448    Pair<RegionInfo, ServerName> pair;
1449
1450    for(int i = 0; i < nameofRegionsToMerge.length; i++) {
1451      pair = getRegion(nameofRegionsToMerge[i]);
1452
1453      if (pair != null) {
1454        if (pair.getFirst().getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1455          throw new IllegalArgumentException ("Can't invoke merge on non-default regions directly");
1456        }
1457        if (tableName == null) {
1458          tableName = pair.getFirst().getTable();
1459        } else  if (!tableName.equals(pair.getFirst().getTable())) {
1460          throw new IllegalArgumentException ("Cannot merge regions from two different tables " +
1461              tableName + " and " + pair.getFirst().getTable());
1462        }
1463      } else {
1464        throw new UnknownRegionException (
1465          "Can't invoke merge on unknown region "
1466          + Bytes.toStringBinary(encodedNameofRegionsToMerge[i]));
1467      }
1468    }
1469
1470    MergeTableRegionsResponse response =
1471        executeCallable(new MasterCallable<MergeTableRegionsResponse>(getConnection(),
1472            getRpcControllerFactory()) {
1473          Long nonceGroup = ng.getNonceGroup();
1474          Long nonce = ng.newNonce();
1475      @Override
1476      protected MergeTableRegionsResponse rpcCall() throws Exception {
1477        MergeTableRegionsRequest request = RequestConverter
1478            .buildMergeTableRegionsRequest(
1479                encodedNameofRegionsToMerge,
1480                forcible,
1481                nonceGroup,
1482                nonce);
1483        return master.mergeTableRegions(getRpcController(), request);
1484      }
1485    });
1486    return new MergeTableRegionsFuture(this, tableName, response);
1487  }
1488
1489  private static class MergeTableRegionsFuture extends TableFuture<Void> {
1490    public MergeTableRegionsFuture(
1491        final HBaseAdmin admin,
1492        final TableName tableName,
1493        final MergeTableRegionsResponse response) {
1494      super(admin, tableName,
1495          (response != null && response.hasProcId()) ? response.getProcId() : null);
1496    }
1497
1498    public MergeTableRegionsFuture(
1499        final HBaseAdmin admin,
1500        final TableName tableName,
1501        final Long procId) {
1502      super(admin, tableName, procId);
1503    }
1504
1505    @Override
1506    public String getOperationType() {
1507      return "MERGE_REGIONS";
1508    }
1509  }
1510  /**
1511   * Split one region. Synchronous operation.
1512   * Note: It is not feasible to predict the length of split.
1513   *   Therefore, this is for internal testing only.
1514   * @param regionName encoded or full name of region
1515   * @param splitPoint key where region splits
1516   * @throws IOException if a remote or network exception occurs
1517   */
1518  @VisibleForTesting
1519  public void splitRegionSync(byte[] regionName, byte[] splitPoint) throws IOException {
1520    splitRegionSync(regionName, splitPoint, syncWaitTimeout, TimeUnit.MILLISECONDS);
1521  }
1522
1523
1524  /**
1525   * Split one region. Synchronous operation.
1526   * @param regionName region to be split
1527   * @param splitPoint split point
1528   * @param timeout how long to wait on split
1529   * @param units time units
1530   * @throws IOException if a remote or network exception occurs
1531   */
1532  public void splitRegionSync(byte[] regionName, byte[] splitPoint, final long timeout,
1533      final TimeUnit units) throws IOException {
1534    get(splitRegionAsync(regionName, splitPoint), timeout, units);
1535  }
1536
1537  @Override
1538  public Future<Void> splitRegionAsync(byte[] regionName, byte[] splitPoint)
1539      throws IOException {
1540    byte[] encodedNameofRegionToSplit = RegionInfo.isEncodedRegionName(regionName) ?
1541        regionName : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1542    Pair<RegionInfo, ServerName> pair = getRegion(regionName);
1543    if (pair != null) {
1544      if (pair.getFirst() != null &&
1545          pair.getFirst().getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1546        throw new IllegalArgumentException ("Can't invoke split on non-default regions directly");
1547      }
1548    } else {
1549      throw new UnknownRegionException(
1550        "Can't invoke split on unknown region " + Bytes.toStringBinary(encodedNameofRegionToSplit));
1551    }
1552
1553    return splitRegionAsync(pair.getFirst(), splitPoint);
1554  }
1555
1556  Future<Void> splitRegionAsync(RegionInfo hri, byte[] splitPoint) throws IOException {
1557    TableName tableName = hri.getTable();
1558    if (hri.getStartKey() != null && splitPoint != null &&
1559        Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
1560      throw new IOException("should not give a splitkey which equals to startkey!");
1561    }
1562
1563    SplitTableRegionResponse response = executeCallable(
1564        new MasterCallable<SplitTableRegionResponse>(getConnection(), getRpcControllerFactory()) {
1565          Long nonceGroup = ng.getNonceGroup();
1566          Long nonce = ng.newNonce();
1567          @Override
1568          protected SplitTableRegionResponse rpcCall() throws Exception {
1569            setPriority(tableName);
1570            SplitTableRegionRequest request = RequestConverter
1571                .buildSplitTableRegionRequest(hri, splitPoint, nonceGroup, nonce);
1572            return master.splitRegion(getRpcController(), request);
1573          }
1574        });
1575    return new SplitTableRegionFuture(this, tableName, response);
1576  }
1577
1578  private static class SplitTableRegionFuture extends TableFuture<Void> {
1579    public SplitTableRegionFuture(final HBaseAdmin admin,
1580        final TableName tableName,
1581        final SplitTableRegionResponse response) {
1582      super(admin, tableName,
1583          (response != null && response.hasProcId()) ? response.getProcId() : null);
1584    }
1585
1586    public SplitTableRegionFuture(
1587        final HBaseAdmin admin,
1588        final TableName tableName,
1589        final Long procId) {
1590      super(admin, tableName, procId);
1591    }
1592
1593    @Override
1594    public String getOperationType() {
1595      return "SPLIT_REGION";
1596    }
1597  }
1598
1599  @Override
1600  public void split(final TableName tableName) throws IOException {
1601    split(tableName, null);
1602  }
1603
1604  @Override
1605  public void split(final TableName tableName, final byte[] splitPoint) throws IOException {
1606    checkTableExists(tableName);
1607    for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) {
1608      ServerName sn = loc.getServerName();
1609      if (sn == null) {
1610        continue;
1611      }
1612      RegionInfo r = loc.getRegion();
1613      // check for parents
1614      if (r.isSplitParent()) {
1615        continue;
1616      }
1617      // if a split point given, only split that particular region
1618      if (r.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID ||
1619          (splitPoint != null && !r.containsRow(splitPoint))) {
1620        continue;
1621      }
1622      // call out to master to do split now
1623      splitRegionAsync(r, splitPoint);
1624    }
1625  }
1626
1627  private static class ModifyTableFuture extends TableFuture<Void> {
1628    public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName,
1629        final ModifyTableResponse response) {
1630      super(admin, tableName,
1631          (response != null && response.hasProcId()) ? response.getProcId() : null);
1632    }
1633
1634    public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
1635      super(admin, tableName, procId);
1636    }
1637
1638    @Override
1639    public String getOperationType() {
1640      return "MODIFY";
1641    }
1642  }
1643
1644  /**
1645   * @param regionName Name of a region.
1646   * @return a pair of HRegionInfo and ServerName if <code>regionName</code> is
1647   *  a verified region name (we call {@link
1648   *  MetaTableAccessor#getRegionLocation(Connection, byte[])}
1649   *  else null.
1650   * Throw IllegalArgumentException if <code>regionName</code> is null.
1651   * @throws IOException if a remote or network exception occurs
1652   */
1653  Pair<RegionInfo, ServerName> getRegion(final byte[] regionName) throws IOException {
1654    if (regionName == null) {
1655      throw new IllegalArgumentException("Pass a table name or region name");
1656    }
1657    Pair<RegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionName);
1658    if (pair == null) {
1659      final AtomicReference<Pair<RegionInfo, ServerName>> result = new AtomicReference<>(null);
1660      final String encodedName = Bytes.toString(regionName);
1661      MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
1662        @Override
1663        public boolean visit(Result data) throws IOException {
1664          RegionInfo info = MetaTableAccessor.getRegionInfo(data);
1665          if (info == null) {
1666            LOG.warn("No serialized HRegionInfo in " + data);
1667            return true;
1668          }
1669          RegionLocations rl = MetaTableAccessor.getRegionLocations(data);
1670          boolean matched = false;
1671          ServerName sn = null;
1672          if (rl != null) {
1673            for (HRegionLocation h : rl.getRegionLocations()) {
1674              if (h != null && encodedName.equals(h.getRegion().getEncodedName())) {
1675                sn = h.getServerName();
1676                info = h.getRegion();
1677                matched = true;
1678              }
1679            }
1680          }
1681          if (!matched) return true;
1682          result.set(new Pair<>(info, sn));
1683          return false; // found the region, stop
1684        }
1685      };
1686
1687      MetaTableAccessor.fullScanRegions(connection, visitor);
1688      pair = result.get();
1689    }
1690    return pair;
1691  }
1692
1693  /**
1694   * If the input is a region name, it is returned as is. If it's an
1695   * encoded region name, the corresponding region is found from meta
1696   * and its region name is returned. If we can't find any region in
1697   * meta matching the input as either region name or encoded region
1698   * name, the input is returned as is. We don't throw unknown
1699   * region exception.
1700   */
1701  private byte[] getRegionName(final byte[] regionNameOrEncodedRegionName) throws IOException {
1702    if (Bytes.equals(regionNameOrEncodedRegionName,
1703      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
1704      Bytes.equals(regionNameOrEncodedRegionName,
1705        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
1706      return RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName();
1707    }
1708    byte[] tmp = regionNameOrEncodedRegionName;
1709    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionNameOrEncodedRegionName);
1710    if (regionServerPair != null && regionServerPair.getFirst() != null) {
1711      tmp = regionServerPair.getFirst().getRegionName();
1712    }
1713    return tmp;
1714  }
1715
1716  /**
1717   * Check if table exists or not
1718   * @param tableName Name of a table.
1719   * @return tableName instance
1720   * @throws IOException if a remote or network exception occurs.
1721   * @throws TableNotFoundException if table does not exist.
1722   */
1723  private TableName checkTableExists(final TableName tableName)
1724      throws IOException {
1725    return executeCallable(new RpcRetryingCallable<TableName>() {
1726      @Override
1727      protected TableName rpcCall(int callTimeout) throws Exception {
1728        if (!MetaTableAccessor.tableExists(connection, tableName)) {
1729          throw new TableNotFoundException(tableName);
1730        }
1731        return tableName;
1732      }
1733    });
1734  }
1735
1736  @Override
1737  public synchronized void shutdown() throws IOException {
1738    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1739      @Override
1740      protected Void rpcCall() throws Exception {
1741        setPriority(HConstants.HIGH_QOS);
1742        master.shutdown(getRpcController(), ShutdownRequest.newBuilder().build());
1743        return null;
1744      }
1745    });
1746  }
1747
1748  @Override
1749  public synchronized void stopMaster() throws IOException {
1750    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1751      @Override
1752      protected Void rpcCall() throws Exception {
1753        setPriority(HConstants.HIGH_QOS);
1754        master.stopMaster(getRpcController(), StopMasterRequest.newBuilder().build());
1755        return null;
1756      }
1757    });
1758  }
1759
1760  @Override
1761  public synchronized void stopRegionServer(final String hostnamePort)
1762  throws IOException {
1763    String hostname = Addressing.parseHostname(hostnamePort);
1764    int port = Addressing.parsePort(hostnamePort);
1765    final AdminService.BlockingInterface admin =
1766      this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
1767    // TODO: There is no timeout on this controller. Set one!
1768    HBaseRpcController controller = rpcControllerFactory.newController();
1769    controller.setPriority(HConstants.HIGH_QOS);
1770    StopServerRequest request = RequestConverter.buildStopServerRequest(
1771        "Called by admin client " + this.connection.toString());
1772    try {
1773      admin.stopServer(controller, request);
1774    } catch (Exception e) {
1775      throw ProtobufUtil.handleRemoteException(e);
1776    }
1777  }
1778
1779  @Override
1780  public boolean isMasterInMaintenanceMode() throws IOException {
1781    return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(),
1782        this.rpcControllerFactory) {
1783      @Override
1784      protected IsInMaintenanceModeResponse rpcCall() throws Exception {
1785        return master.isMasterInMaintenanceMode(getRpcController(),
1786            IsInMaintenanceModeRequest.newBuilder().build());
1787      }
1788    }).getInMaintenanceMode();
1789  }
1790
1791  @Override
1792  public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException {
1793    return executeCallable(new MasterCallable<ClusterMetrics>(getConnection(),
1794        this.rpcControllerFactory) {
1795      @Override
1796      protected ClusterMetrics rpcCall() throws Exception {
1797        GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(options);
1798        return ClusterMetricsBuilder.toClusterMetrics(
1799          master.getClusterStatus(getRpcController(), req).getClusterStatus());
1800      }
1801    });
1802  }
1803
1804  @Override
1805  public List<RegionMetrics> getRegionMetrics(ServerName serverName, TableName tableName)
1806      throws IOException {
1807    AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
1808    HBaseRpcController controller = rpcControllerFactory.newController();
1809    AdminProtos.GetRegionLoadRequest request =
1810      RequestConverter.buildGetRegionLoadRequest(tableName);
1811    try {
1812      return admin.getRegionLoad(controller, request).getRegionLoadsList().stream()
1813        .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList());
1814    } catch (ServiceException se) {
1815      throw ProtobufUtil.getRemoteException(se);
1816    }
1817  }
1818
1819  @Override
1820  public Configuration getConfiguration() {
1821    return this.conf;
1822  }
1823
1824  /**
1825   * Do a get with a timeout against the passed in <code>future</code>.
1826   */
1827  private static <T> T get(final Future<T> future, final long timeout, final TimeUnit units)
1828  throws IOException {
1829    try {
1830      // TODO: how long should we wait? Spin forever?
1831      return future.get(timeout, units);
1832    } catch (InterruptedException e) {
1833      IOException ioe = new InterruptedIOException("Interrupt while waiting on " + future);
1834      ioe.initCause(e);
1835      throw ioe;
1836    } catch (TimeoutException e) {
1837      throw new TimeoutIOException(e);
1838    } catch (ExecutionException e) {
1839      if (e.getCause() instanceof IOException) {
1840        throw (IOException)e.getCause();
1841      } else {
1842        throw new IOException(e.getCause());
1843      }
1844    }
1845  }
1846
1847  @Override
1848  public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
1849      throws IOException {
1850    CreateNamespaceResponse response =
1851        executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(),
1852            getRpcControllerFactory()) {
1853      @Override
1854      protected CreateNamespaceResponse rpcCall() throws Exception {
1855        return master.createNamespace(getRpcController(),
1856          CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil.
1857              toProtoNamespaceDescriptor(descriptor)).build());
1858      }
1859    });
1860    return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
1861      @Override
1862      public String getOperationType() {
1863        return "CREATE_NAMESPACE";
1864      }
1865    };
1866  }
1867
1868  @Override
1869  public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
1870      throws IOException {
1871    ModifyNamespaceResponse response =
1872        executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(),
1873            getRpcControllerFactory()) {
1874      @Override
1875      protected ModifyNamespaceResponse rpcCall() throws Exception {
1876        // TODO: set priority based on NS?
1877        return master.modifyNamespace(getRpcController(), ModifyNamespaceRequest.newBuilder().
1878          setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
1879       }
1880    });
1881    return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
1882      @Override
1883      public String getOperationType() {
1884        return "MODIFY_NAMESPACE";
1885      }
1886    };
1887  }
1888
1889  @Override
1890  public Future<Void> deleteNamespaceAsync(final String name)
1891      throws IOException {
1892    DeleteNamespaceResponse response =
1893        executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(),
1894            getRpcControllerFactory()) {
1895      @Override
1896      protected DeleteNamespaceResponse rpcCall() throws Exception {
1897        // TODO: set priority based on NS?
1898        return master.deleteNamespace(getRpcController(), DeleteNamespaceRequest.newBuilder().
1899          setNamespaceName(name).build());
1900        }
1901      });
1902    return new NamespaceFuture(this, name, response.getProcId()) {
1903      @Override
1904      public String getOperationType() {
1905        return "DELETE_NAMESPACE";
1906      }
1907    };
1908  }
1909
1910  @Override
1911  public NamespaceDescriptor getNamespaceDescriptor(final String name)
1912      throws NamespaceNotFoundException, IOException {
1913    return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(),
1914        getRpcControllerFactory()) {
1915      @Override
1916      protected NamespaceDescriptor rpcCall() throws Exception {
1917        return ProtobufUtil.toNamespaceDescriptor(
1918            master.getNamespaceDescriptor(getRpcController(),
1919                GetNamespaceDescriptorRequest.newBuilder().
1920                  setNamespaceName(name).build()).getNamespaceDescriptor());
1921      }
1922    });
1923  }
1924
1925  /**
1926   * List available namespaces
1927   * @return List of namespace names
1928   * @throws IOException if a remote or network exception occurs
1929   */
1930  @Override
1931  public String[] listNamespaces() throws IOException {
1932    return executeCallable(new MasterCallable<String[]>(getConnection(),
1933        getRpcControllerFactory()) {
1934      @Override
1935      protected String[] rpcCall() throws Exception {
1936        List<String> list = master.listNamespaces(getRpcController(),
1937          ListNamespacesRequest.newBuilder().build()).getNamespaceNameList();
1938        return list.toArray(new String[list.size()]);
1939      }
1940    });
1941  }
1942
1943  /**
1944   * List available namespace descriptors
1945   * @return List of descriptors
1946   * @throws IOException if a remote or network exception occurs
1947   */
1948  @Override
1949  public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
1950    return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(),
1951        getRpcControllerFactory()) {
1952      @Override
1953      protected NamespaceDescriptor[] rpcCall() throws Exception {
1954        List<HBaseProtos.NamespaceDescriptor> list =
1955            master.listNamespaceDescriptors(getRpcController(),
1956              ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList();
1957        NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
1958        for(int i = 0; i < list.size(); i++) {
1959          res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
1960        }
1961        return res;
1962      }
1963    });
1964  }
1965
1966  @Override
1967  public String getProcedures() throws IOException {
1968    return executeCallable(new MasterCallable<String>(getConnection(),
1969        getRpcControllerFactory()) {
1970      @Override
1971      protected String rpcCall() throws Exception {
1972        GetProceduresRequest request = GetProceduresRequest.newBuilder().build();
1973        GetProceduresResponse response = master.getProcedures(getRpcController(), request);
1974        return ProtobufUtil.toProcedureJson(response.getProcedureList());
1975      }
1976    });
1977  }
1978
1979  @Override
1980  public String getLocks() throws IOException {
1981    return executeCallable(new MasterCallable<String>(getConnection(),
1982        getRpcControllerFactory()) {
1983      @Override
1984      protected String rpcCall() throws Exception {
1985        GetLocksRequest request = GetLocksRequest.newBuilder().build();
1986        GetLocksResponse response = master.getLocks(getRpcController(), request);
1987        return ProtobufUtil.toLockJson(response.getLockList());
1988      }
1989    });
1990  }
1991
1992  @Override
1993  public TableName[] listTableNamesByNamespace(final String name) throws IOException {
1994    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
1995        getRpcControllerFactory()) {
1996      @Override
1997      protected TableName[] rpcCall() throws Exception {
1998        List<HBaseProtos.TableName> tableNames =
1999            master.listTableNamesByNamespace(getRpcController(), ListTableNamesByNamespaceRequest.
2000                newBuilder().setNamespaceName(name).build())
2001            .getTableNameList();
2002        TableName[] result = new TableName[tableNames.size()];
2003        for (int i = 0; i < tableNames.size(); i++) {
2004          result[i] = ProtobufUtil.toTableName(tableNames.get(i));
2005        }
2006        return result;
2007      }
2008    });
2009  }
2010
2011  /**
2012   * Is HBase available? Throw an exception if not.
2013   * <p/>
2014   * TODO: do not expose ZKConnectionException.
2015   * @param conf system configuration
2016   * @throws MasterNotRunningException if the master is not running.
2017   * @throws ZooKeeperConnectionException if unable to connect to zookeeper.
2018   */
2019  public static void available(final Configuration conf)
2020      throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
2021    Configuration copyOfConf = HBaseConfiguration.create(conf);
2022    // We set it to make it fail as soon as possible if HBase is not available
2023    copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
2024    copyOfConf.setInt("zookeeper.recovery.retry", 0);
2025
2026    // Check ZK first.
2027    // If the connection exists, we may have a connection to ZK that does not work anymore
2028    try (ClusterConnection connection =
2029      (ClusterConnection) ConnectionFactory.createConnection(copyOfConf)) {
2030      // can throw MasterNotRunningException
2031      connection.isMasterRunning();
2032    }
2033  }
2034
2035  private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
2036      FailedLogCloseException {
2037    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2038    RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
2039    // TODO: There is no timeout on this controller. Set one!
2040    HBaseRpcController controller = rpcControllerFactory.newController();
2041    try {
2042      return admin.rollWALWriter(controller, request);
2043    } catch (ServiceException e) {
2044      throw ProtobufUtil.handleRemoteException(e);
2045    }
2046  }
2047
2048  @Override
2049  public synchronized void rollWALWriter(ServerName serverName)
2050      throws IOException, FailedLogCloseException {
2051    rollWALWriterImpl(serverName);
2052  }
2053
2054  @Override
2055  public CompactionState getCompactionState(final TableName tableName)
2056  throws IOException {
2057    return getCompactionState(tableName, CompactType.NORMAL);
2058  }
2059
2060  @Override
2061  public CompactionState getCompactionStateForRegion(final byte[] regionName)
2062  throws IOException {
2063    final Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
2064    if (regionServerPair == null) {
2065      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
2066    }
2067    if (regionServerPair.getSecond() == null) {
2068      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
2069    }
2070    ServerName sn = regionServerPair.getSecond();
2071    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2072    // TODO: There is no timeout on this controller. Set one!
2073    HBaseRpcController controller = rpcControllerFactory.newController();
2074    GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
2075      regionServerPair.getFirst().getRegionName(), true);
2076    GetRegionInfoResponse response;
2077    try {
2078      response = admin.getRegionInfo(controller, request);
2079    } catch (ServiceException e) {
2080      throw ProtobufUtil.handleRemoteException(e);
2081    }
2082    if (response.getCompactionState() != null) {
2083      return ProtobufUtil.createCompactionState(response.getCompactionState());
2084    }
2085    return null;
2086  }
2087
2088  @Override
2089  public void snapshot(SnapshotDescription snapshotDesc)
2090      throws IOException, SnapshotCreationException, IllegalArgumentException {
2091    // actually take the snapshot
2092    SnapshotProtos.SnapshotDescription snapshot =
2093      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2094    SnapshotResponse response = asyncSnapshot(snapshot);
2095    final IsSnapshotDoneRequest request =
2096        IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build();
2097    IsSnapshotDoneResponse done = null;
2098    long start = EnvironmentEdgeManager.currentTime();
2099    long max = response.getExpectedTimeout();
2100    long maxPauseTime = max / this.numRetries;
2101    int tries = 0;
2102    LOG.debug("Waiting a max of " + max + " ms for snapshot '" +
2103        ClientSnapshotDescriptionUtils.toString(snapshot) + "'' to complete. (max " +
2104        maxPauseTime + " ms per retry)");
2105    while (tries == 0
2106        || ((EnvironmentEdgeManager.currentTime() - start) < max && !done.getDone())) {
2107      try {
2108        // sleep a backoff <= pauseTime amount
2109        long sleep = getPauseTime(tries++);
2110        sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2111        LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2112          "ms while waiting for snapshot completion.");
2113        Thread.sleep(sleep);
2114      } catch (InterruptedException e) {
2115        throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2116      }
2117      LOG.debug("Getting current status of snapshot from master...");
2118      done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
2119          getRpcControllerFactory()) {
2120        @Override
2121        protected IsSnapshotDoneResponse rpcCall() throws Exception {
2122          return master.isSnapshotDone(getRpcController(), request);
2123        }
2124      });
2125    }
2126    if (!done.getDone()) {
2127      throw new SnapshotCreationException("Snapshot '" + snapshot.getName()
2128          + "' wasn't completed in expectedTime:" + max + " ms", snapshotDesc);
2129    }
2130  }
2131
2132  @Override
2133  public Future<Void> snapshotAsync(SnapshotDescription snapshotDesc)
2134      throws IOException, SnapshotCreationException {
2135    asyncSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc));
2136    return new ProcedureFuture<Void>(this, null) {
2137
2138      @Override
2139      protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException {
2140        waitForState(deadlineTs, new WaitForStateCallable() {
2141
2142          @Override
2143          public void throwInterruptedException() throws InterruptedIOException {
2144            throw new InterruptedIOException(
2145              "Interrupted while waiting for taking snapshot" + snapshotDesc);
2146          }
2147
2148          @Override
2149          public void throwTimeoutException(long elapsedTime) throws TimeoutException {
2150            throw new TimeoutException("Snapshot '" + snapshotDesc.getName() +
2151              "' wasn't completed in expectedTime:" + elapsedTime + " ms");
2152          }
2153
2154          @Override
2155          public boolean checkState(int tries) throws IOException {
2156            return isSnapshotFinished(snapshotDesc);
2157          }
2158        });
2159        return null;
2160      }
2161    };
2162  }
2163
2164  private SnapshotResponse asyncSnapshot(SnapshotProtos.SnapshotDescription snapshot)
2165      throws IOException {
2166    ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2167    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2168        .build();
2169    // run the snapshot on the master
2170    return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(),
2171        getRpcControllerFactory()) {
2172      @Override
2173      protected SnapshotResponse rpcCall() throws Exception {
2174        return master.snapshot(getRpcController(), request);
2175      }
2176    });
2177  }
2178
2179  @Override
2180  public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
2181      throws IOException, HBaseSnapshotException, UnknownSnapshotException {
2182    final SnapshotProtos.SnapshotDescription snapshot =
2183        ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2184    return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
2185        getRpcControllerFactory()) {
2186      @Override
2187      protected IsSnapshotDoneResponse rpcCall() throws Exception {
2188        return master.isSnapshotDone(getRpcController(),
2189          IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
2190      }
2191    }).getDone();
2192  }
2193
2194  @Override
2195  public void restoreSnapshot(final String snapshotName)
2196      throws IOException, RestoreSnapshotException {
2197    boolean takeFailSafeSnapshot =
2198        conf.getBoolean(HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2199          HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2200    restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2201  }
2202
2203  /**
2204   * Check whether the snapshot exists and contains disabled table
2205   *
2206   * @param snapshotName name of the snapshot to restore
2207   * @throws IOException if a remote or network exception occurs
2208   * @throws RestoreSnapshotException if no valid snapshot is found
2209   */
2210  private TableName getTableNameBeforeRestoreSnapshot(final String snapshotName)
2211      throws IOException, RestoreSnapshotException {
2212    TableName tableName = null;
2213    for (SnapshotDescription snapshotInfo: listSnapshots()) {
2214      if (snapshotInfo.getName().equals(snapshotName)) {
2215        tableName = snapshotInfo.getTableName();
2216        break;
2217      }
2218    }
2219
2220    if (tableName == null) {
2221      throw new RestoreSnapshotException(
2222        "Unable to find the table name for snapshot=" + snapshotName);
2223    }
2224    return tableName;
2225  }
2226
2227  @Override
2228  public void restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot)
2229      throws IOException, RestoreSnapshotException {
2230    restoreSnapshot(snapshotName, takeFailSafeSnapshot, false);
2231  }
2232
2233  @Override
2234  public void restoreSnapshot(final String snapshotName, final boolean takeFailSafeSnapshot,
2235      final boolean restoreAcl) throws IOException, RestoreSnapshotException {
2236    TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2237
2238    // The table does not exists, switch to clone.
2239    if (!tableExists(tableName)) {
2240      cloneSnapshot(snapshotName, tableName, restoreAcl);
2241      return;
2242    }
2243
2244    // Check if the table is disabled
2245    if (!isTableDisabled(tableName)) {
2246      throw new TableNotDisabledException(tableName);
2247    }
2248
2249    // Take a snapshot of the current state
2250    String failSafeSnapshotSnapshotName = null;
2251    if (takeFailSafeSnapshot) {
2252      failSafeSnapshotSnapshotName = conf.get("hbase.snapshot.restore.failsafe.name",
2253        "hbase-failsafe-{snapshot.name}-{restore.timestamp}");
2254      failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotName
2255        .replace("{snapshot.name}", snapshotName)
2256        .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2257        .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2258      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2259      snapshot(failSafeSnapshotSnapshotName, tableName);
2260    }
2261
2262    try {
2263      // Restore snapshot
2264      get(
2265        internalRestoreSnapshotAsync(snapshotName, tableName, restoreAcl),
2266        syncWaitTimeout,
2267        TimeUnit.MILLISECONDS);
2268    } catch (IOException e) {
2269      // Something went wrong during the restore...
2270      // if the pre-restore snapshot is available try to rollback
2271      if (takeFailSafeSnapshot) {
2272        try {
2273          get(
2274            internalRestoreSnapshotAsync(failSafeSnapshotSnapshotName, tableName, restoreAcl),
2275            syncWaitTimeout,
2276            TimeUnit.MILLISECONDS);
2277          String msg = "Restore snapshot=" + snapshotName +
2278            " failed. Rollback to snapshot=" + failSafeSnapshotSnapshotName + " succeeded.";
2279          LOG.error(msg, e);
2280          throw new RestoreSnapshotException(msg, e);
2281        } catch (IOException ex) {
2282          String msg = "Failed to restore and rollback to snapshot=" + failSafeSnapshotSnapshotName;
2283          LOG.error(msg, ex);
2284          throw new RestoreSnapshotException(msg, e);
2285        }
2286      } else {
2287        throw new RestoreSnapshotException("Failed to restore snapshot=" + snapshotName, e);
2288      }
2289    }
2290
2291    // If the restore is succeeded, delete the pre-restore snapshot
2292    if (takeFailSafeSnapshot) {
2293      try {
2294        LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2295        deleteSnapshot(failSafeSnapshotSnapshotName);
2296      } catch (IOException e) {
2297        LOG.error("Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, e);
2298      }
2299    }
2300  }
2301
2302  @Override
2303  public Future<Void> cloneSnapshotAsync(String snapshotName, TableName tableName,
2304      boolean restoreAcl) throws IOException, TableExistsException, RestoreSnapshotException {
2305    if (tableExists(tableName)) {
2306      throw new TableExistsException(tableName);
2307    }
2308    return internalRestoreSnapshotAsync(snapshotName, tableName, restoreAcl);
2309  }
2310
2311  @Override
2312  public byte[] execProcedureWithReturn(String signature, String instance, Map<String,
2313      String> props) throws IOException {
2314    ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
2315    final ExecProcedureRequest request =
2316        ExecProcedureRequest.newBuilder().setProcedure(desc).build();
2317    // run the procedure on the master
2318    ExecProcedureResponse response = executeCallable(
2319      new MasterCallable<ExecProcedureResponse>(getConnection(), getRpcControllerFactory()) {
2320        @Override
2321        protected ExecProcedureResponse rpcCall() throws Exception {
2322          return master.execProcedureWithRet(getRpcController(), request);
2323        }
2324      });
2325
2326    return response.hasReturnData() ? response.getReturnData().toByteArray() : null;
2327  }
2328
2329  @Override
2330  public void execProcedure(String signature, String instance, Map<String, String> props)
2331      throws IOException {
2332    ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
2333    final ExecProcedureRequest request =
2334        ExecProcedureRequest.newBuilder().setProcedure(desc).build();
2335    // run the procedure on the master
2336    ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
2337        getConnection(), getRpcControllerFactory()) {
2338      @Override
2339      protected ExecProcedureResponse rpcCall() throws Exception {
2340        return master.execProcedure(getRpcController(), request);
2341      }
2342    });
2343
2344    long start = EnvironmentEdgeManager.currentTime();
2345    long max = response.getExpectedTimeout();
2346    long maxPauseTime = max / this.numRetries;
2347    int tries = 0;
2348    LOG.debug("Waiting a max of " + max + " ms for procedure '" +
2349        signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
2350    boolean done = false;
2351    while (tries == 0
2352        || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
2353      try {
2354        // sleep a backoff <= pauseTime amount
2355        long sleep = getPauseTime(tries++);
2356        sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2357        LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2358          "ms while waiting for procedure completion.");
2359        Thread.sleep(sleep);
2360      } catch (InterruptedException e) {
2361        throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2362      }
2363      LOG.debug("Getting current status of procedure from master...");
2364      done = isProcedureFinished(signature, instance, props);
2365    }
2366    if (!done) {
2367      throw new IOException("Procedure '" + signature + " : " + instance
2368          + "' wasn't completed in expectedTime:" + max + " ms");
2369    }
2370  }
2371
2372  @Override
2373  public boolean isProcedureFinished(String signature, String instance, Map<String, String> props)
2374      throws IOException {
2375    ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
2376    return executeCallable(
2377      new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
2378        @Override
2379        protected IsProcedureDoneResponse rpcCall() throws Exception {
2380          return master.isProcedureDone(getRpcController(),
2381            IsProcedureDoneRequest.newBuilder().setProcedure(desc).build());
2382        }
2383      }).getDone();
2384  }
2385
2386  /**
2387   * Execute Restore/Clone snapshot and wait for the server to complete (blocking).
2388   * To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to
2389   * create an HTable instance to this table before it is available.
2390   * @param snapshotName snapshot to restore
2391   * @param tableName table name to restore the snapshot on
2392   * @throws IOException if a remote or network exception occurs
2393   * @throws RestoreSnapshotException if snapshot failed to be restored
2394   * @throws IllegalArgumentException if the restore request is formatted incorrectly
2395   */
2396  private Future<Void> internalRestoreSnapshotAsync(final String snapshotName,
2397      final TableName tableName, final boolean restoreAcl)
2398      throws IOException, RestoreSnapshotException {
2399    final SnapshotProtos.SnapshotDescription snapshot =
2400        SnapshotProtos.SnapshotDescription.newBuilder()
2401        .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2402
2403    // actually restore the snapshot
2404    ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2405
2406    RestoreSnapshotResponse response = executeCallable(
2407        new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) {
2408          Long nonceGroup = ng.getNonceGroup();
2409          Long nonce = ng.newNonce();
2410      @Override
2411      protected RestoreSnapshotResponse rpcCall() throws Exception {
2412        final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
2413            .setSnapshot(snapshot)
2414            .setNonceGroup(nonceGroup)
2415            .setNonce(nonce)
2416            .setRestoreACL(restoreAcl)
2417            .build();
2418        return master.restoreSnapshot(getRpcController(), request);
2419      }
2420    });
2421
2422    return new RestoreSnapshotFuture(this, snapshot, tableName, response);
2423  }
2424
2425  private static class RestoreSnapshotFuture extends TableFuture<Void> {
2426    public RestoreSnapshotFuture(
2427        final HBaseAdmin admin,
2428        final SnapshotProtos.SnapshotDescription snapshot,
2429        final TableName tableName,
2430        final RestoreSnapshotResponse response) {
2431      super(admin, tableName,
2432          (response != null && response.hasProcId()) ? response.getProcId() : null);
2433
2434      if (response != null && !response.hasProcId()) {
2435        throw new UnsupportedOperationException("Client could not call old version of Server");
2436      }
2437    }
2438
2439    public RestoreSnapshotFuture(
2440        final HBaseAdmin admin,
2441        final TableName tableName,
2442        final Long procId) {
2443      super(admin, tableName, procId);
2444    }
2445
2446    @Override
2447    public String getOperationType() {
2448      return "MODIFY";
2449    }
2450  }
2451
2452  @Override
2453  public List<SnapshotDescription> listSnapshots() throws IOException {
2454    return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(),
2455        getRpcControllerFactory()) {
2456      @Override
2457      protected List<SnapshotDescription> rpcCall() throws Exception {
2458        List<SnapshotProtos.SnapshotDescription> snapshotsList = master
2459            .getCompletedSnapshots(getRpcController(),
2460                GetCompletedSnapshotsRequest.newBuilder().build())
2461            .getSnapshotsList();
2462        List<SnapshotDescription> result = new ArrayList<>(snapshotsList.size());
2463        for (SnapshotProtos.SnapshotDescription snapshot : snapshotsList) {
2464          result.add(ProtobufUtil.createSnapshotDesc(snapshot));
2465        }
2466        return result;
2467      }
2468    });
2469  }
2470
2471  @Override
2472  public List<SnapshotDescription> listSnapshots(Pattern pattern) throws IOException {
2473    List<SnapshotDescription> matched = new LinkedList<>();
2474    List<SnapshotDescription> snapshots = listSnapshots();
2475    for (SnapshotDescription snapshot : snapshots) {
2476      if (pattern.matcher(snapshot.getName()).matches()) {
2477        matched.add(snapshot);
2478      }
2479    }
2480    return matched;
2481  }
2482
2483  @Override
2484  public List<SnapshotDescription> listTableSnapshots(Pattern tableNamePattern,
2485      Pattern snapshotNamePattern) throws IOException {
2486    TableName[] tableNames = listTableNames(tableNamePattern);
2487
2488    List<SnapshotDescription> tableSnapshots = new LinkedList<>();
2489    List<SnapshotDescription> snapshots = listSnapshots(snapshotNamePattern);
2490
2491    List<TableName> listOfTableNames = Arrays.asList(tableNames);
2492    for (SnapshotDescription snapshot : snapshots) {
2493      if (listOfTableNames.contains(snapshot.getTableName())) {
2494        tableSnapshots.add(snapshot);
2495      }
2496    }
2497    return tableSnapshots;
2498  }
2499
2500  @Override
2501  public void deleteSnapshot(final String snapshotName) throws IOException {
2502    // make sure the snapshot is possibly valid
2503    TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
2504    // do the delete
2505    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2506      @Override
2507      protected Void rpcCall() throws Exception {
2508        master.deleteSnapshot(getRpcController(),
2509          DeleteSnapshotRequest.newBuilder().setSnapshot(
2510                SnapshotProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
2511              .build()
2512        );
2513        return null;
2514      }
2515    });
2516  }
2517
2518  @Override
2519  public void deleteSnapshots(final Pattern pattern) throws IOException {
2520    List<SnapshotDescription> snapshots = listSnapshots(pattern);
2521    for (final SnapshotDescription snapshot : snapshots) {
2522      try {
2523        internalDeleteSnapshot(snapshot);
2524      } catch (IOException ex) {
2525        LOG.info("Failed to delete snapshot " + snapshot.getName() + " for table "
2526                + snapshot.getTableNameAsString(), ex);
2527      }
2528    }
2529  }
2530
2531  private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
2532    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2533      @Override
2534      protected Void rpcCall() throws Exception {
2535        this.master.deleteSnapshot(getRpcController(), DeleteSnapshotRequest.newBuilder()
2536          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build());
2537        return null;
2538      }
2539    });
2540  }
2541
2542  @Override
2543  public void deleteTableSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern)
2544      throws IOException {
2545    List<SnapshotDescription> snapshots = listTableSnapshots(tableNamePattern, snapshotNamePattern);
2546    for (SnapshotDescription snapshot : snapshots) {
2547      try {
2548        internalDeleteSnapshot(snapshot);
2549        LOG.debug("Successfully deleted snapshot: " + snapshot.getName());
2550      } catch (IOException e) {
2551        LOG.error("Failed to delete snapshot: " + snapshot.getName(), e);
2552      }
2553    }
2554  }
2555
2556  @Override
2557  public void setQuota(final QuotaSettings quota) throws IOException {
2558    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2559      @Override
2560      protected Void rpcCall() throws Exception {
2561        this.master.setQuota(getRpcController(), QuotaSettings.buildSetQuotaRequestProto(quota));
2562        return null;
2563      }
2564    });
2565  }
2566
2567  @Override
2568  public List<QuotaSettings> getQuota(QuotaFilter filter) throws IOException {
2569    List<QuotaSettings> quotas = new LinkedList<>();
2570    try (QuotaRetriever retriever = QuotaRetriever.open(conf, filter)) {
2571      Iterator<QuotaSettings> iterator = retriever.iterator();
2572      while (iterator.hasNext()) {
2573        quotas.add(iterator.next());
2574      }
2575    }
2576    return quotas;
2577  }
2578
2579  private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable)
2580      throws IOException {
2581    return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout);
2582  }
2583
2584  static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
2585             RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
2586  throws IOException {
2587    RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
2588    try {
2589      return caller.callWithRetries(callable, operationTimeout);
2590    } finally {
2591      callable.close();
2592    }
2593  }
2594
2595  @Override
2596  // Coprocessor Endpoint against the Master.
2597  public CoprocessorRpcChannel coprocessorService() {
2598    return new SyncCoprocessorRpcChannel() {
2599      @Override
2600      protected Message callExecService(final RpcController controller,
2601          final Descriptors.MethodDescriptor method, final Message request,
2602          final Message responsePrototype)
2603      throws IOException {
2604        if (LOG.isTraceEnabled()) {
2605          LOG.trace("Call: " + method.getName() + ", " + request.toString());
2606        }
2607        // Try-with-resources so close gets called when we are done.
2608        try (MasterCallable<CoprocessorServiceResponse> callable =
2609            new MasterCallable<CoprocessorServiceResponse>(connection,
2610                connection.getRpcControllerFactory()) {
2611          @Override
2612          protected CoprocessorServiceResponse rpcCall() throws Exception {
2613            CoprocessorServiceRequest csr =
2614                CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
2615            return this.master.execMasterService(getRpcController(), csr);
2616          }
2617        }) {
2618          // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller
2619          callable.prepare(false);
2620          int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
2621          CoprocessorServiceResponse result = callable.call(operationTimeout);
2622          return CoprocessorRpcUtils.getResponse(result, responsePrototype);
2623        }
2624      }
2625    };
2626  }
2627
2628  @Override
2629  public CoprocessorRpcChannel coprocessorService(final ServerName serverName) {
2630    return new SyncCoprocessorRpcChannel() {
2631      @Override
2632      protected Message callExecService(RpcController controller,
2633          Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
2634      throws IOException {
2635        if (LOG.isTraceEnabled()) {
2636          LOG.trace("Call: " + method.getName() + ", " + request.toString());
2637        }
2638        CoprocessorServiceRequest csr =
2639            CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
2640        // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller
2641        // TODO: Make this same as RegionCoprocessorRpcChannel and MasterCoprocessorRpcChannel. They
2642        // are all different though should do same thing; e.g. RpcChannel setup.
2643        ClientProtos.ClientService.BlockingInterface stub = connection.getClient(serverName);
2644        CoprocessorServiceResponse result;
2645        try {
2646          result = stub.
2647              execRegionServerService(connection.getRpcControllerFactory().newController(), csr);
2648          return CoprocessorRpcUtils.getResponse(result, responsePrototype);
2649        } catch (ServiceException e) {
2650          throw ProtobufUtil.handleRemoteException(e);
2651        }
2652      }
2653    };
2654  }
2655
2656  @Override
2657  public void updateConfiguration(final ServerName server) throws IOException {
2658    final AdminService.BlockingInterface admin = this.connection.getAdmin(server);
2659    Callable<Void> callable = new Callable<Void>() {
2660      @Override
2661      public Void call() throws Exception {
2662        admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance());
2663        return null;
2664      }
2665    };
2666    ProtobufUtil.call(callable);
2667  }
2668
2669  @Override
2670  public void updateConfiguration() throws IOException {
2671    ClusterMetrics status = getClusterMetrics(
2672      EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS));
2673    for (ServerName server : status.getLiveServerMetrics().keySet()) {
2674      updateConfiguration(server);
2675    }
2676
2677    updateConfiguration(status.getMasterName());
2678
2679    for (ServerName server : status.getBackupMasterNames()) {
2680      updateConfiguration(server);
2681    }
2682  }
2683
2684  @Override
2685  public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
2686    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
2687      @Override
2688      protected Long rpcCall() throws Exception {
2689        MajorCompactionTimestampRequest req =
2690            MajorCompactionTimestampRequest.newBuilder()
2691                .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
2692        return master.getLastMajorCompactionTimestamp(getRpcController(), req).
2693            getCompactionTimestamp();
2694      }
2695    });
2696  }
2697
2698  @Override
2699  public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
2700    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
2701      @Override
2702      protected Long rpcCall() throws Exception {
2703        MajorCompactionTimestampForRegionRequest req =
2704            MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter
2705                      .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
2706        return master.getLastMajorCompactionTimestampForRegion(getRpcController(), req)
2707            .getCompactionTimestamp();
2708      }
2709    });
2710  }
2711
2712  /**
2713   * {@inheritDoc}
2714   */
2715  @Override
2716  public void compact(final TableName tableName, final byte[] columnFamily, CompactType compactType)
2717    throws IOException, InterruptedException {
2718    compact(tableName, columnFamily, false, compactType);
2719  }
2720
2721  /**
2722   * {@inheritDoc}
2723   */
2724  @Override
2725  public void compact(final TableName tableName, CompactType compactType)
2726    throws IOException, InterruptedException {
2727    compact(tableName, null, false, compactType);
2728  }
2729
2730  /**
2731   * {@inheritDoc}
2732   */
2733  @Override
2734  public void majorCompact(final TableName tableName, final byte[] columnFamily,
2735    CompactType compactType) throws IOException, InterruptedException {
2736    compact(tableName, columnFamily, true, compactType);
2737  }
2738
2739  /**
2740   * {@inheritDoc}
2741   */
2742  @Override
2743  public void majorCompact(final TableName tableName, CompactType compactType)
2744          throws IOException, InterruptedException {
2745    compact(tableName, null, true, compactType);
2746  }
2747
2748  /**
2749   * {@inheritDoc}
2750   */
2751  @Override
2752  public CompactionState getCompactionState(final TableName tableName, CompactType compactType)
2753      throws IOException {
2754    checkTableExists(tableName);
2755    if (!isTableEnabled(tableName)) {
2756      // If the table is disabled, the compaction state of the table should always be NONE
2757      return ProtobufUtil.createCompactionState(
2758        AdminProtos.GetRegionInfoResponse.CompactionState.NONE);
2759    }
2760
2761    AdminProtos.GetRegionInfoResponse.CompactionState state =
2762      AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
2763
2764    // TODO: There is no timeout on this controller. Set one!
2765    HBaseRpcController rpcController = rpcControllerFactory.newController();
2766    switch (compactType) {
2767      case MOB:
2768        final AdminProtos.AdminService.BlockingInterface masterAdmin =
2769          this.connection.getAdminForMaster();
2770        Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
2771          new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
2772            @Override
2773            public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
2774              RegionInfo info = RegionInfo.createMobRegionInfo(tableName);
2775              GetRegionInfoRequest request =
2776                RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true);
2777              GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
2778              return response.getCompactionState();
2779            }
2780          };
2781        state = ProtobufUtil.call(callable);
2782        break;
2783      case NORMAL:
2784        for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) {
2785          ServerName sn = loc.getServerName();
2786          if (sn == null) {
2787            continue;
2788          }
2789          byte[] regionName = loc.getRegion().getRegionName();
2790          AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
2791          try {
2792            Callable<GetRegionInfoResponse> regionInfoCallable =
2793              new Callable<GetRegionInfoResponse>() {
2794                @Override
2795                public GetRegionInfoResponse call() throws Exception {
2796                  GetRegionInfoRequest request =
2797                    RequestConverter.buildGetRegionInfoRequest(regionName, true);
2798                  return snAdmin.getRegionInfo(rpcController, request);
2799                }
2800              };
2801            GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
2802            switch (response.getCompactionState()) {
2803              case MAJOR_AND_MINOR:
2804                return CompactionState.MAJOR_AND_MINOR;
2805              case MAJOR:
2806                if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MINOR) {
2807                  return CompactionState.MAJOR_AND_MINOR;
2808                }
2809                state = AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR;
2810                break;
2811              case MINOR:
2812                if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) {
2813                  return CompactionState.MAJOR_AND_MINOR;
2814                }
2815                state = AdminProtos.GetRegionInfoResponse.CompactionState.MINOR;
2816                break;
2817              case NONE:
2818              default: // nothing, continue
2819            }
2820          } catch (NotServingRegionException e) {
2821            if (LOG.isDebugEnabled()) {
2822              LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " +
2823                StringUtils.stringifyException(e));
2824            }
2825          } catch (RemoteException e) {
2826            if (e.getMessage().indexOf(NotServingRegionException.class.getName()) >= 0) {
2827              if (LOG.isDebugEnabled()) {
2828                LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " +
2829                  StringUtils.stringifyException(e));
2830              }
2831            } else {
2832              throw e;
2833            }
2834          }
2835        }
2836        break;
2837      default:
2838        throw new IllegalArgumentException("Unknown compactType: " + compactType);
2839    }
2840    if (state != null) {
2841      return ProtobufUtil.createCompactionState(state);
2842    }
2843    return null;
2844  }
2845
2846  /**
2847   * Future that waits on a procedure result.
2848   * Returned by the async version of the Admin calls,
2849   * and used internally by the sync calls to wait on the result of the procedure.
2850   */
2851  @InterfaceAudience.Private
2852  @InterfaceStability.Evolving
2853  protected static class ProcedureFuture<V> implements Future<V> {
2854    private ExecutionException exception = null;
2855    private boolean procResultFound = false;
2856    private boolean done = false;
2857    private boolean cancelled = false;
2858    private V result = null;
2859
2860    private final HBaseAdmin admin;
2861    protected final Long procId;
2862
2863    public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
2864      this.admin = admin;
2865      this.procId = procId;
2866    }
2867
2868    @Override
2869    public boolean cancel(boolean mayInterruptIfRunning) {
2870      AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder()
2871          .setProcId(procId).setMayInterruptIfRunning(mayInterruptIfRunning).build();
2872      try {
2873        cancelled = abortProcedureResult(abortProcRequest).getIsProcedureAborted();
2874        if (cancelled) {
2875          done = true;
2876        }
2877      } catch (IOException e) {
2878        // Cancell thrown exception for some reason. At this time, we are not sure whether
2879        // the cancell succeeds or fails. We assume that it is failed, but print out a warning
2880        // for debugging purpose.
2881        LOG.warn(
2882          "Cancelling the procedure with procId=" + procId + " throws exception " + e.getMessage(),
2883          e);
2884        cancelled = false;
2885      }
2886      return cancelled;
2887    }
2888
2889    @Override
2890    public boolean isCancelled() {
2891      return cancelled;
2892    }
2893
2894    protected AbortProcedureResponse abortProcedureResult(
2895        final AbortProcedureRequest request) throws IOException {
2896      return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
2897          admin.getConnection(), admin.getRpcControllerFactory()) {
2898        @Override
2899        protected AbortProcedureResponse rpcCall() throws Exception {
2900          return master.abortProcedure(getRpcController(), request);
2901        }
2902      });
2903    }
2904
2905    @Override
2906    public V get() throws InterruptedException, ExecutionException {
2907      // TODO: should we ever spin forever?
2908      // fix HBASE-21715. TODO: If the function call get() without timeout limit is not allowed,
2909      // is it possible to compose instead of inheriting from the class Future for this class?
2910      try {
2911        return get(admin.getProcedureTimeout, TimeUnit.MILLISECONDS);
2912      } catch (TimeoutException e) {
2913        LOG.warn("Failed to get the procedure with procId=" + procId + " throws exception " + e
2914            .getMessage(), e);
2915        return null;
2916      }
2917    }
2918
2919    @Override
2920    public V get(long timeout, TimeUnit unit)
2921        throws InterruptedException, ExecutionException, TimeoutException {
2922      if (!done) {
2923        long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
2924        try {
2925          try {
2926            // if the master support procedures, try to wait the result
2927            if (procId != null) {
2928              result = waitProcedureResult(procId, deadlineTs);
2929            }
2930            // if we don't have a proc result, try the compatibility wait
2931            if (!procResultFound) {
2932              result = waitOperationResult(deadlineTs);
2933            }
2934            result = postOperationResult(result, deadlineTs);
2935            done = true;
2936          } catch (IOException e) {
2937            result = postOperationFailure(e, deadlineTs);
2938            done = true;
2939          }
2940        } catch (IOException e) {
2941          exception = new ExecutionException(e);
2942          done = true;
2943        }
2944      }
2945      if (exception != null) {
2946        throw exception;
2947      }
2948      return result;
2949    }
2950
2951    @Override
2952    public boolean isDone() {
2953      return done;
2954    }
2955
2956    protected HBaseAdmin getAdmin() {
2957      return admin;
2958    }
2959
2960    private V waitProcedureResult(long procId, long deadlineTs)
2961        throws IOException, TimeoutException, InterruptedException {
2962      GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
2963          .setProcId(procId)
2964          .build();
2965
2966      int tries = 0;
2967      IOException serviceEx = null;
2968      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
2969        GetProcedureResultResponse response = null;
2970        try {
2971          // Try to fetch the result
2972          response = getProcedureResult(request);
2973        } catch (IOException e) {
2974          serviceEx = unwrapException(e);
2975
2976          // the master may be down
2977          LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
2978
2979          // Not much to do, if we have a DoNotRetryIOException
2980          if (serviceEx instanceof DoNotRetryIOException) {
2981            // TODO: looks like there is no way to unwrap this exception and get the proper
2982            // UnsupportedOperationException aside from looking at the message.
2983            // anyway, if we fail here we just failover to the compatibility side
2984            // and that is always a valid solution.
2985            LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx);
2986            procResultFound = false;
2987            return null;
2988          }
2989        }
2990
2991        // If the procedure is no longer running, we should have a result
2992        if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) {
2993          procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
2994          return convertResult(response);
2995        }
2996
2997        try {
2998          Thread.sleep(getAdmin().getPauseTime(tries++));
2999        } catch (InterruptedException e) {
3000          throw new InterruptedException(
3001            "Interrupted while waiting for the result of proc " + procId);
3002        }
3003      }
3004      if (serviceEx != null) {
3005        throw serviceEx;
3006      } else {
3007        throw new TimeoutException("The procedure " + procId + " is still running");
3008      }
3009    }
3010
3011    private static IOException unwrapException(IOException e) {
3012      if (e instanceof RemoteException) {
3013        return ((RemoteException)e).unwrapRemoteException();
3014      }
3015      return e;
3016    }
3017
3018    protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
3019        throws IOException {
3020      return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
3021          admin.getConnection(), admin.getRpcControllerFactory()) {
3022        @Override
3023        protected GetProcedureResultResponse rpcCall() throws Exception {
3024          return master.getProcedureResult(getRpcController(), request);
3025        }
3026      });
3027    }
3028
3029    /**
3030     * Convert the procedure result response to a specified type.
3031     * @param response the procedure result object to parse
3032     * @return the result data of the procedure.
3033     */
3034    protected V convertResult(final GetProcedureResultResponse response) throws IOException {
3035      if (response.hasException()) {
3036        throw ForeignExceptionUtil.toIOException(response.getException());
3037      }
3038      return null;
3039    }
3040
3041    /**
3042     * Fallback implementation in case the procedure is not supported by the server.
3043     * It should try to wait until the operation is completed.
3044     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3045     * @return the result data of the operation
3046     */
3047    protected V waitOperationResult(final long deadlineTs)
3048        throws IOException, TimeoutException {
3049      return null;
3050    }
3051
3052    /**
3053     * Called after the operation is completed and the result fetched. this allows to perform extra
3054     * steps after the procedure is completed. it allows to apply transformations to the result that
3055     * will be returned by get().
3056     * @param result the result of the procedure
3057     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3058     * @return the result of the procedure, which may be the same as the passed one
3059     */
3060    protected V postOperationResult(final V result, final long deadlineTs)
3061        throws IOException, TimeoutException {
3062      return result;
3063    }
3064
3065    /**
3066     * Called after the operation is terminated with a failure.
3067     * this allows to perform extra steps after the procedure is terminated.
3068     * it allows to apply transformations to the result that will be returned by get().
3069     * The default implementation will rethrow the exception
3070     * @param exception the exception got from fetching the result
3071     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3072     * @return the result of the procedure, which may be the same as the passed one
3073     */
3074    protected V postOperationFailure(final IOException exception, final long deadlineTs)
3075        throws IOException, TimeoutException {
3076      throw exception;
3077    }
3078
3079    protected interface WaitForStateCallable {
3080      boolean checkState(int tries) throws IOException;
3081      void throwInterruptedException() throws InterruptedIOException;
3082      void throwTimeoutException(long elapsed) throws TimeoutException;
3083    }
3084
3085    protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
3086        throws IOException, TimeoutException {
3087      int tries = 0;
3088      IOException serverEx = null;
3089      long startTime = EnvironmentEdgeManager.currentTime();
3090      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3091        serverEx = null;
3092        try {
3093          if (callable.checkState(tries)) {
3094            return;
3095          }
3096        } catch (IOException e) {
3097          serverEx = e;
3098        }
3099        try {
3100          Thread.sleep(getAdmin().getPauseTime(tries++));
3101        } catch (InterruptedException e) {
3102          callable.throwInterruptedException();
3103        }
3104      }
3105      if (serverEx != null) {
3106        throw unwrapException(serverEx);
3107      } else {
3108        callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
3109      }
3110    }
3111  }
3112
3113  @InterfaceAudience.Private
3114  @InterfaceStability.Evolving
3115  protected static abstract class TableFuture<V> extends ProcedureFuture<V> {
3116    private final TableName tableName;
3117
3118    public TableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
3119      super(admin, procId);
3120      this.tableName = tableName;
3121    }
3122
3123    @Override
3124    public String toString() {
3125      return getDescription();
3126    }
3127
3128    /**
3129     * @return the table name
3130     */
3131    protected TableName getTableName() {
3132      return tableName;
3133    }
3134
3135    /**
3136     * @return the table descriptor
3137     */
3138    protected TableDescriptor getDescriptor() throws IOException {
3139      return getAdmin().getDescriptor(getTableName());
3140    }
3141
3142    /**
3143     * @return the operation type like CREATE, DELETE, DISABLE etc.
3144     */
3145    public abstract String getOperationType();
3146
3147    /**
3148     * @return a description of the operation
3149     */
3150    protected String getDescription() {
3151      return "Operation: " + getOperationType() + ", " + "Table Name: " +
3152        tableName.getNameWithNamespaceInclAsString() + ", procId: " + procId;
3153    }
3154
3155    protected abstract class TableWaitForStateCallable implements WaitForStateCallable {
3156      @Override
3157      public void throwInterruptedException() throws InterruptedIOException {
3158        throw new InterruptedIOException("Interrupted while waiting for " + getDescription());
3159      }
3160
3161      @Override
3162      public void throwTimeoutException(long elapsedTime) throws TimeoutException {
3163        throw new TimeoutException(
3164          getDescription() + " has not completed after " + elapsedTime + "ms");
3165      }
3166    }
3167
3168    @Override
3169    protected V postOperationResult(final V result, final long deadlineTs)
3170        throws IOException, TimeoutException {
3171      LOG.info(getDescription() + " completed");
3172      return super.postOperationResult(result, deadlineTs);
3173    }
3174
3175    @Override
3176    protected V postOperationFailure(final IOException exception, final long deadlineTs)
3177        throws IOException, TimeoutException {
3178      LOG.info(getDescription() + " failed with " + exception.getMessage());
3179      return super.postOperationFailure(exception, deadlineTs);
3180    }
3181
3182    protected void waitForTableEnabled(final long deadlineTs)
3183        throws IOException, TimeoutException {
3184      waitForState(deadlineTs, new TableWaitForStateCallable() {
3185        @Override
3186        public boolean checkState(int tries) throws IOException {
3187          try {
3188            if (getAdmin().isTableAvailable(tableName)) {
3189              return true;
3190            }
3191          } catch (TableNotFoundException tnfe) {
3192            LOG.debug("Table " + tableName.getNameWithNamespaceInclAsString()
3193                + " was not enabled, sleeping. tries=" + tries);
3194          }
3195          return false;
3196        }
3197      });
3198    }
3199
3200    protected void waitForTableDisabled(final long deadlineTs)
3201        throws IOException, TimeoutException {
3202      waitForState(deadlineTs, new TableWaitForStateCallable() {
3203        @Override
3204        public boolean checkState(int tries) throws IOException {
3205          return getAdmin().isTableDisabled(tableName);
3206        }
3207      });
3208    }
3209
3210    protected void waitTableNotFound(final long deadlineTs)
3211        throws IOException, TimeoutException {
3212      waitForState(deadlineTs, new TableWaitForStateCallable() {
3213        @Override
3214        public boolean checkState(int tries) throws IOException {
3215          return !getAdmin().tableExists(tableName);
3216        }
3217      });
3218    }
3219
3220    protected void waitForAllRegionsOnline(final long deadlineTs, final byte[][] splitKeys)
3221        throws IOException, TimeoutException {
3222      final TableDescriptor desc = getDescriptor();
3223      final AtomicInteger actualRegCount = new AtomicInteger(0);
3224      final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3225        @Override
3226        public boolean visit(Result rowResult) throws IOException {
3227          RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
3228          if (list == null) {
3229            LOG.warn("No serialized HRegionInfo in " + rowResult);
3230            return true;
3231          }
3232          HRegionLocation l = list.getRegionLocation();
3233          if (l == null) {
3234            return true;
3235          }
3236          if (!l.getRegion().getTable().equals(desc.getTableName())) {
3237            return false;
3238          }
3239          if (l.getRegion().isOffline() || l.getRegion().isSplit()) {
3240            return true;
3241          }
3242          HRegionLocation[] locations = list.getRegionLocations();
3243          for (HRegionLocation location : locations) {
3244            if (location == null) continue;
3245            ServerName serverName = location.getServerName();
3246            // Make sure that regions are assigned to server
3247            if (serverName != null && serverName.getAddress() != null) {
3248              actualRegCount.incrementAndGet();
3249            }
3250          }
3251          return true;
3252        }
3253      };
3254
3255      int tries = 0;
3256      int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
3257      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3258        actualRegCount.set(0);
3259        MetaTableAccessor.scanMetaForTableRegions(getAdmin().getConnection(), visitor,
3260          desc.getTableName());
3261        if (actualRegCount.get() == numRegs) {
3262          // all the regions are online
3263          return;
3264        }
3265
3266        try {
3267          Thread.sleep(getAdmin().getPauseTime(tries++));
3268        } catch (InterruptedException e) {
3269          throw new InterruptedIOException("Interrupted when opening" + " regions; "
3270              + actualRegCount.get() + " of " + numRegs + " regions processed so far");
3271        }
3272      }
3273      throw new TimeoutException("Only " + actualRegCount.get() + " of " + numRegs
3274          + " regions are online; retries exhausted.");
3275    }
3276  }
3277
3278  @InterfaceAudience.Private
3279  @InterfaceStability.Evolving
3280  protected static abstract class NamespaceFuture extends ProcedureFuture<Void> {
3281    private final String namespaceName;
3282
3283    public NamespaceFuture(final HBaseAdmin admin, final String namespaceName, final Long procId) {
3284      super(admin, procId);
3285      this.namespaceName = namespaceName;
3286    }
3287
3288    /**
3289     * @return the namespace name
3290     */
3291    protected String getNamespaceName() {
3292      return namespaceName;
3293    }
3294
3295    /**
3296     * @return the operation type like CREATE_NAMESPACE, DELETE_NAMESPACE, etc.
3297     */
3298    public abstract String getOperationType();
3299
3300    @Override
3301    public String toString() {
3302      return "Operation: " + getOperationType() + ", Namespace: " + getNamespaceName();
3303    }
3304  }
3305
3306  @InterfaceAudience.Private
3307  @InterfaceStability.Evolving
3308  private static class ReplicationFuture extends ProcedureFuture<Void> {
3309    private final String peerId;
3310    private final Supplier<String> getOperation;
3311
3312    public ReplicationFuture(HBaseAdmin admin, String peerId, Long procId,
3313        Supplier<String> getOperation) {
3314      super(admin, procId);
3315      this.peerId = peerId;
3316      this.getOperation = getOperation;
3317    }
3318
3319    @Override
3320    public String toString() {
3321      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
3322    }
3323  }
3324
3325  @Override
3326  public List<SecurityCapability> getSecurityCapabilities() throws IOException {
3327    try {
3328      return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection(),
3329          getRpcControllerFactory()) {
3330        @Override
3331        protected List<SecurityCapability> rpcCall() throws Exception {
3332          SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build();
3333          return ProtobufUtil.toSecurityCapabilityList(
3334            master.getSecurityCapabilities(getRpcController(), req).getCapabilitiesList());
3335        }
3336      });
3337    } catch (IOException e) {
3338      if (e instanceof RemoteException) {
3339        e = ((RemoteException)e).unwrapRemoteException();
3340      }
3341      throw e;
3342    }
3343  }
3344
3345  @Override
3346  public boolean splitSwitch(boolean enabled, boolean synchronous) throws IOException {
3347    return splitOrMergeSwitch(enabled, synchronous, MasterSwitchType.SPLIT);
3348  }
3349
3350  @Override
3351  public boolean mergeSwitch(boolean enabled, boolean synchronous) throws IOException {
3352    return splitOrMergeSwitch(enabled, synchronous, MasterSwitchType.MERGE);
3353  }
3354
3355  private boolean splitOrMergeSwitch(boolean enabled, boolean synchronous,
3356      MasterSwitchType switchType) throws IOException {
3357    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3358      @Override
3359      protected Boolean rpcCall() throws Exception {
3360        MasterProtos.SetSplitOrMergeEnabledResponse response = master.setSplitOrMergeEnabled(
3361          getRpcController(),
3362          RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType));
3363        return response.getPrevValueList().get(0);
3364      }
3365    });
3366  }
3367
3368  @Override
3369  public boolean isSplitEnabled() throws IOException {
3370    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3371      @Override
3372      protected Boolean rpcCall() throws Exception {
3373        return master.isSplitOrMergeEnabled(getRpcController(),
3374          RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
3375      }
3376    });
3377  }
3378
3379  @Override
3380  public boolean isMergeEnabled() throws IOException {
3381    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3382      @Override
3383      protected Boolean rpcCall() throws Exception {
3384        return master.isSplitOrMergeEnabled(getRpcController(),
3385          RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
3386      }
3387    });
3388  }
3389
3390  private RpcControllerFactory getRpcControllerFactory() {
3391    return this.rpcControllerFactory;
3392  }
3393
3394  @Override
3395  public Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig,
3396      boolean enabled) throws IOException {
3397    AddReplicationPeerResponse response = executeCallable(
3398      new MasterCallable<AddReplicationPeerResponse>(getConnection(), getRpcControllerFactory()) {
3399        @Override
3400        protected AddReplicationPeerResponse rpcCall() throws Exception {
3401          return master.addReplicationPeer(getRpcController(),
3402            RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
3403        }
3404      });
3405    return new ReplicationFuture(this, peerId, response.getProcId(), () -> "ADD_REPLICATION_PEER");
3406  }
3407
3408  @Override
3409  public Future<Void> removeReplicationPeerAsync(String peerId) throws IOException {
3410    RemoveReplicationPeerResponse response =
3411      executeCallable(new MasterCallable<RemoveReplicationPeerResponse>(getConnection(),
3412          getRpcControllerFactory()) {
3413        @Override
3414        protected RemoveReplicationPeerResponse rpcCall() throws Exception {
3415          return master.removeReplicationPeer(getRpcController(),
3416            RequestConverter.buildRemoveReplicationPeerRequest(peerId));
3417        }
3418      });
3419    return new ReplicationFuture(this, peerId, response.getProcId(),
3420      () -> "REMOVE_REPLICATION_PEER");
3421  }
3422
3423  @Override
3424  public Future<Void> enableReplicationPeerAsync(final String peerId) throws IOException {
3425    EnableReplicationPeerResponse response =
3426      executeCallable(new MasterCallable<EnableReplicationPeerResponse>(getConnection(),
3427          getRpcControllerFactory()) {
3428        @Override
3429        protected EnableReplicationPeerResponse rpcCall() throws Exception {
3430          return master.enableReplicationPeer(getRpcController(),
3431            RequestConverter.buildEnableReplicationPeerRequest(peerId));
3432        }
3433      });
3434    return new ReplicationFuture(this, peerId, response.getProcId(),
3435      () -> "ENABLE_REPLICATION_PEER");
3436  }
3437
3438  @Override
3439  public Future<Void> disableReplicationPeerAsync(final String peerId) throws IOException {
3440    DisableReplicationPeerResponse response =
3441      executeCallable(new MasterCallable<DisableReplicationPeerResponse>(getConnection(),
3442          getRpcControllerFactory()) {
3443        @Override
3444        protected DisableReplicationPeerResponse rpcCall() throws Exception {
3445          return master.disableReplicationPeer(getRpcController(),
3446            RequestConverter.buildDisableReplicationPeerRequest(peerId));
3447        }
3448      });
3449    return new ReplicationFuture(this, peerId, response.getProcId(),
3450      () -> "DISABLE_REPLICATION_PEER");
3451  }
3452
3453  @Override
3454  public ReplicationPeerConfig getReplicationPeerConfig(final String peerId) throws IOException {
3455    return executeCallable(new MasterCallable<ReplicationPeerConfig>(getConnection(),
3456        getRpcControllerFactory()) {
3457      @Override
3458      protected ReplicationPeerConfig rpcCall() throws Exception {
3459        GetReplicationPeerConfigResponse response = master.getReplicationPeerConfig(
3460          getRpcController(), RequestConverter.buildGetReplicationPeerConfigRequest(peerId));
3461        return ReplicationPeerConfigUtil.convert(response.getPeerConfig());
3462      }
3463    });
3464  }
3465
3466  @Override
3467  public Future<Void> updateReplicationPeerConfigAsync(final String peerId,
3468      final ReplicationPeerConfig peerConfig) throws IOException {
3469    UpdateReplicationPeerConfigResponse response =
3470      executeCallable(new MasterCallable<UpdateReplicationPeerConfigResponse>(getConnection(),
3471          getRpcControllerFactory()) {
3472        @Override
3473        protected UpdateReplicationPeerConfigResponse rpcCall() throws Exception {
3474          return master.updateReplicationPeerConfig(getRpcController(),
3475            RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig));
3476        }
3477      });
3478    return new ReplicationFuture(this, peerId, response.getProcId(),
3479      () -> "UPDATE_REPLICATION_PEER_CONFIG");
3480  }
3481
3482  @Override
3483  public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerId,
3484      SyncReplicationState state) throws IOException {
3485    TransitReplicationPeerSyncReplicationStateResponse response =
3486        executeCallable(new MasterCallable<TransitReplicationPeerSyncReplicationStateResponse>(
3487          getConnection(), getRpcControllerFactory()) {
3488          @Override
3489          protected TransitReplicationPeerSyncReplicationStateResponse rpcCall() throws Exception {
3490            return master.transitReplicationPeerSyncReplicationState(getRpcController(),
3491              RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
3492                state));
3493          }
3494        });
3495    return new ReplicationFuture(this, peerId, response.getProcId(),
3496      () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE");
3497  }
3498
3499  @Override
3500  public List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
3501    return listReplicationPeers((Pattern)null);
3502  }
3503
3504  @Override
3505  public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
3506      throws IOException {
3507    return executeCallable(new MasterCallable<List<ReplicationPeerDescription>>(getConnection(),
3508        getRpcControllerFactory()) {
3509      @Override
3510      protected List<ReplicationPeerDescription> rpcCall() throws Exception {
3511        List<ReplicationProtos.ReplicationPeerDescription> peersList = master.listReplicationPeers(
3512          getRpcController(), RequestConverter.buildListReplicationPeersRequest(pattern))
3513            .getPeerDescList();
3514        List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size());
3515        for (ReplicationProtos.ReplicationPeerDescription peer : peersList) {
3516          result.add(ReplicationPeerConfigUtil.toReplicationPeerDescription(peer));
3517        }
3518        return result;
3519      }
3520    });
3521  }
3522
3523  @Override
3524  public void decommissionRegionServers(List<ServerName> servers, boolean offload)
3525      throws IOException {
3526    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3527      @Override
3528      public Void rpcCall() throws ServiceException {
3529        master.decommissionRegionServers(getRpcController(),
3530          RequestConverter.buildDecommissionRegionServersRequest(servers, offload));
3531        return null;
3532      }
3533    });
3534  }
3535
3536  @Override
3537  public List<ServerName> listDecommissionedRegionServers() throws IOException {
3538    return executeCallable(new MasterCallable<List<ServerName>>(getConnection(),
3539              getRpcControllerFactory()) {
3540      @Override
3541      public List<ServerName> rpcCall() throws ServiceException {
3542        ListDecommissionedRegionServersRequest req =
3543            ListDecommissionedRegionServersRequest.newBuilder().build();
3544        List<ServerName> servers = new ArrayList<>();
3545        for (HBaseProtos.ServerName server : master
3546            .listDecommissionedRegionServers(getRpcController(), req).getServerNameList()) {
3547          servers.add(ProtobufUtil.toServerName(server));
3548        }
3549        return servers;
3550      }
3551    });
3552  }
3553
3554  @Override
3555  public void recommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
3556      throws IOException {
3557    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3558      @Override
3559      public Void rpcCall() throws ServiceException {
3560        master.recommissionRegionServer(getRpcController(),
3561          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames));
3562        return null;
3563      }
3564    });
3565  }
3566
3567  @Override
3568  public List<TableCFs> listReplicatedTableCFs() throws IOException {
3569    List<TableCFs> replicatedTableCFs = new ArrayList<>();
3570    List<TableDescriptor> tables = listTableDescriptors();
3571    tables.forEach(table -> {
3572      Map<String, Integer> cfs = new HashMap<>();
3573      Stream.of(table.getColumnFamilies())
3574          .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
3575          .forEach(column -> {
3576            cfs.put(column.getNameAsString(), column.getScope());
3577          });
3578      if (!cfs.isEmpty()) {
3579        replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
3580      }
3581    });
3582    return replicatedTableCFs;
3583  }
3584
3585  @Override
3586  public void enableTableReplication(final TableName tableName) throws IOException {
3587    if (tableName == null) {
3588      throw new IllegalArgumentException("Table name cannot be null");
3589    }
3590    if (!tableExists(tableName)) {
3591      throw new TableNotFoundException("Table '" + tableName.getNameAsString()
3592          + "' does not exists.");
3593    }
3594    byte[][] splits = getTableSplits(tableName);
3595    checkAndSyncTableDescToPeers(tableName, splits);
3596    setTableRep(tableName, true);
3597  }
3598
3599  @Override
3600  public void disableTableReplication(final TableName tableName) throws IOException {
3601    if (tableName == null) {
3602      throw new IllegalArgumentException("Table name is null");
3603    }
3604    if (!tableExists(tableName)) {
3605      throw new TableNotFoundException("Table '" + tableName.getNameAsString()
3606          + "' does not exists.");
3607    }
3608    setTableRep(tableName, false);
3609  }
3610
3611  /**
3612   * Connect to peer and check the table descriptor on peer:
3613   * <ol>
3614   * <li>Create the same table on peer when not exist.</li>
3615   * <li>Throw an exception if the table already has replication enabled on any of the column
3616   * families.</li>
3617   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3618   * </ol>
3619   * @param tableName name of the table to sync to the peer
3620   * @param splits table split keys
3621   * @throws IOException if a remote or network exception occurs
3622   */
3623  private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
3624      throws IOException {
3625    List<ReplicationPeerDescription> peers = listReplicationPeers();
3626    if (peers == null || peers.size() <= 0) {
3627      throw new IllegalArgumentException("Found no peer cluster for replication.");
3628    }
3629
3630    for (ReplicationPeerDescription peerDesc : peers) {
3631      if (peerDesc.getPeerConfig().needToReplicate(tableName)) {
3632        Configuration peerConf =
3633            ReplicationPeerConfigUtil.getPeerClusterConfiguration(this.conf, peerDesc);
3634        try (Connection conn = ConnectionFactory.createConnection(peerConf);
3635            Admin repHBaseAdmin = conn.getAdmin()) {
3636          TableDescriptor tableDesc = getDescriptor(tableName);
3637          TableDescriptor peerTableDesc = null;
3638          if (!repHBaseAdmin.tableExists(tableName)) {
3639            repHBaseAdmin.createTable(tableDesc, splits);
3640          } else {
3641            peerTableDesc = repHBaseAdmin.getDescriptor(tableName);
3642            if (peerTableDesc == null) {
3643              throw new IllegalArgumentException("Failed to get table descriptor for table "
3644                  + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId());
3645            }
3646            if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc,
3647              tableDesc) != 0) {
3648              throw new IllegalArgumentException("Table " + tableName.getNameAsString()
3649                  + " exists in peer cluster " + peerDesc.getPeerId()
3650                  + ", but the table descriptors are not same when compared with source cluster."
3651                  + " Thus can not enable the table's replication switch.");
3652            }
3653          }
3654        }
3655      }
3656    }
3657  }
3658
3659  /**
3660   * Set the table's replication switch if the table's replication switch is already not set.
3661   * @param tableName name of the table
3662   * @param enableRep is replication switch enable or disable
3663   * @throws IOException if a remote or network exception occurs
3664   */
3665  private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
3666    TableDescriptor tableDesc = getDescriptor(tableName);
3667    if (!tableDesc.matchReplicationScope(enableRep)) {
3668      int scope =
3669          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3670      modifyTable(TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build());
3671    }
3672  }
3673
3674  @Override
3675  public void clearCompactionQueues(final ServerName sn, final Set<String> queues)
3676    throws IOException, InterruptedException {
3677    if (queues == null || queues.size() == 0) {
3678      throw new IllegalArgumentException("queues cannot be null or empty");
3679    }
3680    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
3681    Callable<Void> callable = new Callable<Void>() {
3682      @Override
3683      public Void call() throws Exception {
3684        // TODO: There is no timeout on this controller. Set one!
3685        HBaseRpcController controller = rpcControllerFactory.newController();
3686        ClearCompactionQueuesRequest request =
3687                RequestConverter.buildClearCompactionQueuesRequest(queues);
3688        admin.clearCompactionQueues(controller, request);
3689        return null;
3690      }
3691    };
3692    ProtobufUtil.call(callable);
3693  }
3694
3695  @Override
3696  public List<ServerName> clearDeadServers(List<ServerName> servers) throws IOException {
3697    return executeCallable(new MasterCallable<List<ServerName>>(getConnection(),
3698            getRpcControllerFactory()) {
3699      @Override
3700      protected List<ServerName> rpcCall() throws Exception {
3701        ClearDeadServersRequest req = RequestConverter.
3702          buildClearDeadServersRequest(servers == null? Collections.emptyList(): servers);
3703        return ProtobufUtil.toServerNameList(
3704                master.clearDeadServers(getRpcController(), req).getServerNameList());
3705      }
3706    });
3707  }
3708
3709  @Override
3710  public void cloneTableSchema(final TableName tableName, final TableName newTableName,
3711      final boolean preserveSplits) throws IOException {
3712    checkTableExists(tableName);
3713    if (tableExists(newTableName)) {
3714      throw new TableExistsException(newTableName);
3715    }
3716    TableDescriptor htd = TableDescriptorBuilder.copy(newTableName, getDescriptor(tableName));
3717    if (preserveSplits) {
3718      createTable(htd, getTableSplits(tableName));
3719    } else {
3720      createTable(htd);
3721    }
3722  }
3723
3724  @Override
3725  public boolean switchRpcThrottle(final boolean enable) throws IOException {
3726    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3727      @Override
3728      protected Boolean rpcCall() throws Exception {
3729        return this.master
3730            .switchRpcThrottle(getRpcController(), MasterProtos.SwitchRpcThrottleRequest
3731                .newBuilder().setRpcThrottleEnabled(enable).build())
3732            .getPreviousRpcThrottleEnabled();
3733      }
3734    });
3735  }
3736
3737  @Override
3738  public boolean isRpcThrottleEnabled() throws IOException {
3739    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3740      @Override
3741      protected Boolean rpcCall() throws Exception {
3742        return this.master.isRpcThrottleEnabled(getRpcController(),
3743          IsRpcThrottleEnabledRequest.newBuilder().build()).getRpcThrottleEnabled();
3744      }
3745    });
3746  }
3747
3748  @Override
3749  public boolean exceedThrottleQuotaSwitch(final boolean enable) throws IOException {
3750    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3751      @Override
3752      protected Boolean rpcCall() throws Exception {
3753        return this.master
3754            .switchExceedThrottleQuota(getRpcController(),
3755              MasterProtos.SwitchExceedThrottleQuotaRequest.newBuilder()
3756                  .setExceedThrottleQuotaEnabled(enable).build())
3757            .getPreviousExceedThrottleQuotaEnabled();
3758      }
3759    });
3760  }
3761
3762  @Override
3763  public Map<TableName, Long> getSpaceQuotaTableSizes() throws IOException {
3764    return executeCallable(
3765      new MasterCallable<Map<TableName, Long>>(getConnection(), getRpcControllerFactory()) {
3766        @Override
3767        protected Map<TableName, Long> rpcCall() throws Exception {
3768          GetSpaceQuotaRegionSizesResponse resp = master.getSpaceQuotaRegionSizes(
3769            getRpcController(), RequestConverter.buildGetSpaceQuotaRegionSizesRequest());
3770          Map<TableName, Long> tableSizes = new HashMap<>();
3771          for (RegionSizes sizes : resp.getSizesList()) {
3772            TableName tn = ProtobufUtil.toTableName(sizes.getTableName());
3773            tableSizes.put(tn, sizes.getSize());
3774          }
3775          return tableSizes;
3776        }
3777      });
3778  }
3779
3780  @Override
3781  public Map<TableName, SpaceQuotaSnapshot> getRegionServerSpaceQuotaSnapshots(
3782      ServerName serverName) throws IOException {
3783    final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
3784    Callable<GetSpaceQuotaSnapshotsResponse> callable =
3785      new Callable<GetSpaceQuotaSnapshotsResponse>() {
3786        @Override
3787        public GetSpaceQuotaSnapshotsResponse call() throws Exception {
3788          return admin.getSpaceQuotaSnapshots(rpcControllerFactory.newController(),
3789            RequestConverter.buildGetSpaceQuotaSnapshotsRequest());
3790        }
3791      };
3792    GetSpaceQuotaSnapshotsResponse resp = ProtobufUtil.call(callable);
3793    Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>();
3794    for (TableQuotaSnapshot snapshot : resp.getSnapshotsList()) {
3795      snapshots.put(ProtobufUtil.toTableName(snapshot.getTableName()),
3796        SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()));
3797    }
3798    return snapshots;
3799  }
3800
3801  @Override
3802  public SpaceQuotaSnapshot getCurrentSpaceQuotaSnapshot(String namespace) throws IOException {
3803    return executeCallable(
3804      new MasterCallable<SpaceQuotaSnapshot>(getConnection(), getRpcControllerFactory()) {
3805        @Override
3806        protected SpaceQuotaSnapshot rpcCall() throws Exception {
3807          GetQuotaStatesResponse resp = master.getQuotaStates(getRpcController(),
3808            RequestConverter.buildGetQuotaStatesRequest());
3809          for (GetQuotaStatesResponse.NamespaceQuotaSnapshot nsSnapshot : resp
3810            .getNsSnapshotsList()) {
3811            if (namespace.equals(nsSnapshot.getNamespace())) {
3812              return SpaceQuotaSnapshot.toSpaceQuotaSnapshot(nsSnapshot.getSnapshot());
3813            }
3814          }
3815          return null;
3816        }
3817      });
3818  }
3819
3820  @Override
3821  public SpaceQuotaSnapshot getCurrentSpaceQuotaSnapshot(TableName tableName) throws IOException {
3822    return executeCallable(
3823      new MasterCallable<SpaceQuotaSnapshot>(getConnection(), getRpcControllerFactory()) {
3824        @Override
3825        protected SpaceQuotaSnapshot rpcCall() throws Exception {
3826          GetQuotaStatesResponse resp = master.getQuotaStates(getRpcController(),
3827            RequestConverter.buildGetQuotaStatesRequest());
3828          HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3829          for (GetQuotaStatesResponse.TableQuotaSnapshot tableSnapshot : resp
3830            .getTableSnapshotsList()) {
3831            if (protoTableName.equals(tableSnapshot.getTableName())) {
3832              return SpaceQuotaSnapshot.toSpaceQuotaSnapshot(tableSnapshot.getSnapshot());
3833            }
3834          }
3835          return null;
3836        }
3837      });
3838  }
3839
3840  @Override
3841  public void grant(UserPermission userPermission, boolean mergeExistingPermissions)
3842      throws IOException {
3843    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3844      @Override
3845      protected Void rpcCall() throws Exception {
3846        GrantRequest req =
3847            ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions);
3848        this.master.grant(getRpcController(), req);
3849        return null;
3850      }
3851    });
3852  }
3853
3854  @Override
3855  public void revoke(UserPermission userPermission) throws IOException {
3856    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3857      @Override
3858      protected Void rpcCall() throws Exception {
3859        RevokeRequest req = ShadedAccessControlUtil.buildRevokeRequest(userPermission);
3860        this.master.revoke(getRpcController(), req);
3861        return null;
3862      }
3863    });
3864  }
3865
3866  @Override
3867  public List<UserPermission>
3868      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) throws IOException {
3869    return executeCallable(
3870      new MasterCallable<List<UserPermission>>(getConnection(), getRpcControllerFactory()) {
3871        @Override
3872        protected List<UserPermission> rpcCall() throws Exception {
3873          AccessControlProtos.GetUserPermissionsRequest req =
3874              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest);
3875          AccessControlProtos.GetUserPermissionsResponse response =
3876              this.master.getUserPermissions(getRpcController(), req);
3877          return response.getUserPermissionList().stream()
3878              .map(userPermission -> ShadedAccessControlUtil.toUserPermission(userPermission))
3879              .collect(Collectors.toList());
3880        }
3881      });
3882  }
3883
3884  @Override
3885  public List<Boolean> hasUserPermissions(String userName, List<Permission> permissions)
3886      throws IOException {
3887    return executeCallable(
3888      new MasterCallable<List<Boolean>>(getConnection(), getRpcControllerFactory()) {
3889        @Override
3890        protected List<Boolean> rpcCall() throws Exception {
3891          HasUserPermissionsRequest request =
3892              ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions);
3893          return this.master.hasUserPermissions(getRpcController(), request)
3894              .getHasUserPermissionList();
3895        }
3896      });
3897  }
3898
3899  @Override
3900  public void close() {
3901  }
3902
3903  @Override
3904  public Future<Void> splitRegionAsync(byte[] regionName) throws IOException {
3905    return splitRegionAsync(regionName, null);
3906  }
3907
3908  @Override
3909  public Future<Void> createTableAsync(TableDescriptor desc) throws IOException {
3910    return createTableAsync(desc, null);
3911  }
3912}