001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import com.google.protobuf.Descriptors;
021import com.google.protobuf.Message;
022import com.google.protobuf.RpcController;
023
024import java.io.Closeable;
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.concurrent.Callable;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.Future;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.TimeoutException;
042import java.util.concurrent.atomic.AtomicInteger;
043import java.util.concurrent.atomic.AtomicReference;
044import java.util.regex.Pattern;
045import java.util.stream.Collectors;
046import java.util.stream.Stream;
047
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.Abortable;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
052import org.apache.hadoop.hbase.ClusterMetrics;
053import org.apache.hadoop.hbase.ClusterMetrics.Option;
054import org.apache.hadoop.hbase.ClusterMetricsBuilder;
055import org.apache.hadoop.hbase.DoNotRetryIOException;
056import org.apache.hadoop.hbase.HBaseConfiguration;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionInfo;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.HTableDescriptor;
061import org.apache.hadoop.hbase.MasterNotRunningException;
062import org.apache.hadoop.hbase.MetaTableAccessor;
063import org.apache.hadoop.hbase.NamespaceDescriptor;
064import org.apache.hadoop.hbase.NamespaceNotFoundException;
065import org.apache.hadoop.hbase.NotServingRegionException;
066import org.apache.hadoop.hbase.RegionLocations;
067import org.apache.hadoop.hbase.RegionMetrics;
068import org.apache.hadoop.hbase.RegionMetricsBuilder;
069import org.apache.hadoop.hbase.ServerName;
070import org.apache.hadoop.hbase.TableExistsException;
071import org.apache.hadoop.hbase.TableName;
072import org.apache.hadoop.hbase.TableNotDisabledException;
073import org.apache.hadoop.hbase.TableNotFoundException;
074import org.apache.hadoop.hbase.UnknownRegionException;
075import org.apache.hadoop.hbase.ZooKeeperConnectionException;
076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
077import org.apache.hadoop.hbase.client.replication.TableCFs;
078import org.apache.hadoop.hbase.client.security.SecurityCapability;
079import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
080import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
081import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
082import org.apache.hadoop.hbase.ipc.HBaseRpcController;
083import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
084import org.apache.hadoop.hbase.quotas.QuotaFilter;
085import org.apache.hadoop.hbase.quotas.QuotaRetriever;
086import org.apache.hadoop.hbase.quotas.QuotaSettings;
087import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
088import org.apache.hadoop.hbase.replication.ReplicationException;
089import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
090import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
091import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
092import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
096import org.apache.hadoop.hbase.util.Addressing;
097import org.apache.hadoop.hbase.util.Bytes;
098import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
099import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
100import org.apache.hadoop.hbase.util.Pair;
101import org.apache.hadoop.ipc.RemoteException;
102import org.apache.hadoop.util.StringUtils;
103import org.apache.yetus.audience.InterfaceAudience;
104import org.apache.yetus.audience.InterfaceStability;
105import org.slf4j.Logger;
106import org.slf4j.LoggerFactory;
107
108import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
109import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
110import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
111import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
208
209/**
210 * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
211 * this is an HBase-internal class as defined in
212 * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html
213 * There are no guarantees for backwards source / binary compatibility and methods or class can
214 * change or go away without deprecation.
215 * Use {@link Connection#getAdmin()} to obtain an instance of {@link Admin} instead of constructing
216 * an HBaseAdmin directly.
217 *
218 * <p>Connection should be an <i>unmanaged</i> connection obtained via
219 * {@link ConnectionFactory#createConnection(Configuration)}
220 *
221 * @see ConnectionFactory
222 * @see Connection
223 * @see Admin
224 */
225@InterfaceAudience.Private
226@InterfaceStability.Evolving
227public class HBaseAdmin implements Admin {
228  private static final Logger LOG = LoggerFactory.getLogger(HBaseAdmin.class);
229
230  private ClusterConnection connection;
231
232  private final Configuration conf;
233  private final long pause;
234  private final int numRetries;
235  private final int syncWaitTimeout;
236  private boolean aborted;
237  private int operationTimeout;
238  private int rpcTimeout;
239  private int getProcedureTimeout;
240
241  private RpcRetryingCallerFactory rpcCallerFactory;
242  private RpcControllerFactory rpcControllerFactory;
243
244  private NonceGenerator ng;
245
246  @Override
247  public int getOperationTimeout() {
248    return operationTimeout;
249  }
250
251  HBaseAdmin(ClusterConnection connection) throws IOException {
252    this.conf = connection.getConfiguration();
253    this.connection = connection;
254
255    // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time.
256    this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
257        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
258    this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
259        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
260    this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
261        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
262    this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
263        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
264    this.syncWaitTimeout = this.conf.getInt(
265      "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
266    this.getProcedureTimeout =
267        this.conf.getInt("hbase.client.procedure.future.get.timeout.msec", 10 * 60000); // 10min
268
269    this.rpcCallerFactory = connection.getRpcRetryingCallerFactory();
270    this.rpcControllerFactory = connection.getRpcControllerFactory();
271
272    this.ng = this.connection.getNonceGenerator();
273  }
274
275  @Override
276  public void abort(String why, Throwable e) {
277    // Currently does nothing but throw the passed message and exception
278    this.aborted = true;
279    throw new RuntimeException(why, e);
280  }
281
282  @Override
283  public boolean isAborted() {
284    return this.aborted;
285  }
286
287  @Override
288  public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
289  throws IOException {
290    return get(abortProcedureAsync(procId, mayInterruptIfRunning), this.syncWaitTimeout,
291      TimeUnit.MILLISECONDS);
292  }
293
294  @Override
295  public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning)
296      throws IOException {
297    Boolean abortProcResponse =
298        executeCallable(new MasterCallable<AbortProcedureResponse>(getConnection(),
299            getRpcControllerFactory()) {
300      @Override
301      protected AbortProcedureResponse rpcCall() throws Exception {
302        AbortProcedureRequest abortProcRequest =
303            AbortProcedureRequest.newBuilder().setProcId(procId).build();
304        return master.abortProcedure(getRpcController(), abortProcRequest);
305      }
306    }).getIsProcedureAborted();
307    return new AbortProcedureFuture(this, procId, abortProcResponse);
308  }
309
310  @Override
311  public List<TableDescriptor> listTableDescriptors() throws IOException {
312    return listTableDescriptors((Pattern)null, false);
313  }
314
315  @Override
316  public List<TableDescriptor> listTableDescriptors(Pattern pattern) throws IOException {
317    return listTableDescriptors(pattern, false);
318  }
319
320  @Override
321  public List<TableDescriptor> listTableDescriptors(Pattern pattern, boolean includeSysTables)
322      throws IOException {
323    return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(),
324        getRpcControllerFactory()) {
325      @Override
326      protected List<TableDescriptor> rpcCall() throws Exception {
327        GetTableDescriptorsRequest req =
328            RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
329        return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(),
330            req));
331      }
332    });
333  }
334
335  @Override
336  public TableDescriptor getDescriptor(TableName tableName)
337      throws TableNotFoundException, IOException {
338    return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
339       operationTimeout, rpcTimeout);
340  }
341
342  @Override
343  public void modifyTable(TableDescriptor td) throws IOException {
344    get(modifyTableAsync(td), syncWaitTimeout, TimeUnit.MILLISECONDS);
345  }
346
347  @Override
348  public Future<Void> modifyTableAsync(TableDescriptor td) throws IOException {
349    ModifyTableResponse response = executeCallable(
350      new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) {
351        Long nonceGroup = ng.getNonceGroup();
352        Long nonce = ng.newNonce();
353        @Override
354        protected ModifyTableResponse rpcCall() throws Exception {
355          setPriority(td.getTableName());
356          ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
357            td.getTableName(), td, nonceGroup, nonce);
358          return master.modifyTable(getRpcController(), request);
359        }
360      });
361    return new ModifyTableFuture(this, td.getTableName(), response);
362  }
363
364  @Override
365  public List<TableDescriptor> listTableDescriptorsByNamespace(byte[] name) throws IOException {
366    return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(),
367        getRpcControllerFactory()) {
368      @Override
369      protected List<TableDescriptor> rpcCall() throws Exception {
370        return master.listTableDescriptorsByNamespace(getRpcController(),
371                ListTableDescriptorsByNamespaceRequest.newBuilder()
372                  .setNamespaceName(Bytes.toString(name)).build())
373                .getTableSchemaList()
374                .stream()
375                .map(ProtobufUtil::toTableDescriptor)
376                .collect(Collectors.toList());
377      }
378    });
379  }
380
381  @Override
382  public List<TableDescriptor> listTableDescriptors(List<TableName> tableNames) throws IOException {
383    return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(),
384        getRpcControllerFactory()) {
385      @Override
386      protected List<TableDescriptor> rpcCall() throws Exception {
387        GetTableDescriptorsRequest req =
388            RequestConverter.buildGetTableDescriptorsRequest(tableNames);
389          return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(),
390              req));
391      }
392    });
393  }
394
395  @Override
396  public List<RegionInfo> getRegions(final ServerName sn) throws IOException {
397    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
398    // TODO: There is no timeout on this controller. Set one!
399    HBaseRpcController controller = rpcControllerFactory.newController();
400    return ProtobufUtil.getOnlineRegions(controller, admin);
401  }
402
403  @Override
404  public List<RegionInfo> getRegions(TableName tableName) throws IOException {
405    if (TableName.isMetaTableName(tableName)) {
406      return Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO);
407    } else {
408      return MetaTableAccessor.getTableRegions(connection, tableName, true);
409    }
410  }
411
412  private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
413    private boolean isAbortInProgress;
414
415    public AbortProcedureFuture(
416        final HBaseAdmin admin,
417        final Long procId,
418        final Boolean abortProcResponse) {
419      super(admin, procId);
420      this.isAbortInProgress = abortProcResponse;
421    }
422
423    @Override
424    public Boolean get(long timeout, TimeUnit unit)
425        throws InterruptedException, ExecutionException, TimeoutException {
426      if (!this.isAbortInProgress) {
427        return false;
428      }
429      super.get(timeout, unit);
430      return true;
431    }
432  }
433
434  /** @return Connection used by this object. */
435  @Override
436  public Connection getConnection() {
437    return connection;
438  }
439
440  @Override
441  public boolean tableExists(final TableName tableName) throws IOException {
442    return executeCallable(new RpcRetryingCallable<Boolean>() {
443      @Override
444      protected Boolean rpcCall(int callTimeout) throws Exception {
445        return MetaTableAccessor.tableExists(connection, tableName);
446      }
447    });
448  }
449
450  @Override
451  public HTableDescriptor[] listTables() throws IOException {
452    return listTables((Pattern)null, false);
453  }
454
455  @Override
456  public HTableDescriptor[] listTables(Pattern pattern) throws IOException {
457    return listTables(pattern, false);
458  }
459
460  @Override
461  public HTableDescriptor[] listTables(String regex) throws IOException {
462    return listTables(Pattern.compile(regex), false);
463  }
464
465  @Override
466  public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables)
467      throws IOException {
468    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
469        getRpcControllerFactory()) {
470      @Override
471      protected HTableDescriptor[] rpcCall() throws Exception {
472        GetTableDescriptorsRequest req =
473            RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
474        return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(),
475                req)).stream().map(ImmutableHTableDescriptor::new).toArray(HTableDescriptor[]::new);
476      }
477    });
478  }
479
480  @Override
481  public HTableDescriptor[] listTables(String regex, boolean includeSysTables)
482      throws IOException {
483    return listTables(Pattern.compile(regex), includeSysTables);
484  }
485
486  @Override
487  public TableName[] listTableNames() throws IOException {
488    return listTableNames((Pattern)null, false);
489  }
490
491  @Override
492  public TableName[] listTableNames(Pattern pattern) throws IOException {
493    return listTableNames(pattern, false);
494  }
495
496  @Override
497  public TableName[] listTableNames(String regex) throws IOException {
498    return listTableNames(Pattern.compile(regex), false);
499  }
500
501  @Override
502  public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
503      throws IOException {
504    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
505        getRpcControllerFactory()) {
506      @Override
507      protected TableName[] rpcCall() throws Exception {
508        GetTableNamesRequest req =
509            RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
510        return ProtobufUtil.getTableNameArray(master.getTableNames(getRpcController(), req)
511            .getTableNamesList());
512      }
513    });
514  }
515
516  @Override
517  public TableName[] listTableNames(final String regex, final boolean includeSysTables)
518      throws IOException {
519    return listTableNames(Pattern.compile(regex), includeSysTables);
520  }
521
522  @Override
523  public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
524    return getHTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
525       operationTimeout, rpcTimeout);
526  }
527
528  static TableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
529      RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
530      int operationTimeout, int rpcTimeout) throws IOException {
531    if (tableName == null) return null;
532    TableDescriptor td =
533        executeCallable(new MasterCallable<TableDescriptor>(connection, rpcControllerFactory) {
534      @Override
535      protected TableDescriptor rpcCall() throws Exception {
536        GetTableDescriptorsRequest req =
537            RequestConverter.buildGetTableDescriptorsRequest(tableName);
538        GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
539        if (!htds.getTableSchemaList().isEmpty()) {
540          return ProtobufUtil.toTableDescriptor(htds.getTableSchemaList().get(0));
541        }
542        return null;
543      }
544    }, rpcCallerFactory, operationTimeout, rpcTimeout);
545    if (td != null) {
546      return td;
547    }
548    throw new TableNotFoundException(tableName.getNameAsString());
549  }
550
551  /**
552   * @deprecated since 2.0 version and will be removed in 3.0 version.
553   *             use {@link #getTableDescriptor(TableName,
554   *             Connection, RpcRetryingCallerFactory,RpcControllerFactory,int,int)}
555   */
556  @Deprecated
557  static HTableDescriptor getHTableDescriptor(final TableName tableName, Connection connection,
558      RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
559      int operationTimeout, int rpcTimeout) throws IOException {
560    if (tableName == null) {
561      return null;
562    }
563    HTableDescriptor htd =
564        executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) {
565          @Override
566          protected HTableDescriptor rpcCall() throws Exception {
567            GetTableDescriptorsRequest req =
568                RequestConverter.buildGetTableDescriptorsRequest(tableName);
569            GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
570            if (!htds.getTableSchemaList().isEmpty()) {
571              return new ImmutableHTableDescriptor(
572                  ProtobufUtil.toTableDescriptor(htds.getTableSchemaList().get(0)));
573            }
574            return null;
575          }
576        }, rpcCallerFactory, operationTimeout, rpcTimeout);
577    if (htd != null) {
578      return new ImmutableHTableDescriptor(htd);
579    }
580    throw new TableNotFoundException(tableName.getNameAsString());
581  }
582
583  private long getPauseTime(int tries) {
584    int triesCount = tries;
585    if (triesCount >= HConstants.RETRY_BACKOFF.length) {
586      triesCount = HConstants.RETRY_BACKOFF.length - 1;
587    }
588    return this.pause * HConstants.RETRY_BACKOFF[triesCount];
589  }
590
591  @Override
592  public void createTable(TableDescriptor desc)
593  throws IOException {
594    createTable(desc, null);
595  }
596
597  @Override
598  public void createTable(TableDescriptor desc, byte [] startKey,
599      byte [] endKey, int numRegions)
600  throws IOException {
601    if(numRegions < 3) {
602      throw new IllegalArgumentException("Must create at least three regions");
603    } else if(Bytes.compareTo(startKey, endKey) >= 0) {
604      throw new IllegalArgumentException("Start key must be smaller than end key");
605    }
606    if (numRegions == 3) {
607      createTable(desc, new byte[][]{startKey, endKey});
608      return;
609    }
610    byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
611    if(splitKeys == null || splitKeys.length != numRegions - 1) {
612      throw new IllegalArgumentException("Unable to split key range into enough regions");
613    }
614    createTable(desc, splitKeys);
615  }
616
617  @Override
618  public void createTable(final TableDescriptor desc, byte [][] splitKeys)
619      throws IOException {
620    get(createTableAsync(desc, splitKeys), syncWaitTimeout, TimeUnit.MILLISECONDS);
621  }
622
623  @Override
624  public Future<Void> createTableAsync(final TableDescriptor desc, final byte[][] splitKeys)
625      throws IOException {
626    if (desc.getTableName() == null) {
627      throw new IllegalArgumentException("TableName cannot be null");
628    }
629    if (splitKeys != null && splitKeys.length > 0) {
630      Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
631      // Verify there are no duplicate split keys
632      byte[] lastKey = null;
633      for (byte[] splitKey : splitKeys) {
634        if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
635          throw new IllegalArgumentException(
636              "Empty split key must not be passed in the split keys.");
637        }
638        if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
639          throw new IllegalArgumentException("All split keys must be unique, " +
640            "found duplicate: " + Bytes.toStringBinary(splitKey) +
641            ", " + Bytes.toStringBinary(lastKey));
642        }
643        lastKey = splitKey;
644      }
645    }
646
647    CreateTableResponse response = executeCallable(
648      new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) {
649        Long nonceGroup = ng.getNonceGroup();
650        Long nonce = ng.newNonce();
651        @Override
652        protected CreateTableResponse rpcCall() throws Exception {
653          setPriority(desc.getTableName());
654          CreateTableRequest request = RequestConverter.buildCreateTableRequest(
655            desc, splitKeys, nonceGroup, nonce);
656          return master.createTable(getRpcController(), request);
657        }
658      });
659    return new CreateTableFuture(this, desc, splitKeys, response);
660  }
661
662  private static class CreateTableFuture extends TableFuture<Void> {
663    private final TableDescriptor desc;
664    private final byte[][] splitKeys;
665
666    public CreateTableFuture(final HBaseAdmin admin, final TableDescriptor desc,
667        final byte[][] splitKeys, final CreateTableResponse response) {
668      super(admin, desc.getTableName(),
669              (response != null && response.hasProcId()) ? response.getProcId() : null);
670      this.splitKeys = splitKeys;
671      this.desc = desc;
672    }
673
674    @Override
675    protected TableDescriptor getTableDescriptor() {
676      return desc;
677    }
678
679    @Override
680    public String getOperationType() {
681      return "CREATE";
682    }
683
684    @Override
685    protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
686      waitForTableEnabled(deadlineTs);
687      waitForAllRegionsOnline(deadlineTs, splitKeys);
688      return null;
689    }
690  }
691
692  @Override
693  public void deleteTable(final TableName tableName) throws IOException {
694    get(deleteTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
695  }
696
697  @Override
698  public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
699    DeleteTableResponse response = executeCallable(
700      new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) {
701        Long nonceGroup = ng.getNonceGroup();
702        Long nonce = ng.newNonce();
703        @Override
704        protected DeleteTableResponse rpcCall() throws Exception {
705          setPriority(tableName);
706          DeleteTableRequest req =
707              RequestConverter.buildDeleteTableRequest(tableName, nonceGroup,nonce);
708          return master.deleteTable(getRpcController(), req);
709        }
710      });
711    return new DeleteTableFuture(this, tableName, response);
712  }
713
714  private static class DeleteTableFuture extends TableFuture<Void> {
715    public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
716        final DeleteTableResponse response) {
717      super(admin, tableName,
718              (response != null && response.hasProcId()) ? response.getProcId() : null);
719    }
720
721    @Override
722    public String getOperationType() {
723      return "DELETE";
724    }
725
726    @Override
727    protected Void waitOperationResult(final long deadlineTs)
728        throws IOException, TimeoutException {
729      waitTableNotFound(deadlineTs);
730      return null;
731    }
732
733    @Override
734    protected Void postOperationResult(final Void result, final long deadlineTs)
735        throws IOException, TimeoutException {
736      // Delete cached information to prevent clients from using old locations
737      ((ClusterConnection) getAdmin().getConnection()).clearRegionCache(getTableName());
738      return super.postOperationResult(result, deadlineTs);
739    }
740  }
741
742  @Override
743  public HTableDescriptor[] deleteTables(String regex) throws IOException {
744    return deleteTables(Pattern.compile(regex));
745  }
746
747  /**
748   * Delete tables matching the passed in pattern and wait on completion.
749   *
750   * Warning: Use this method carefully, there is no prompting and the effect is
751   * immediate. Consider using {@link #listTables(java.util.regex.Pattern) } and
752   * {@link #deleteTable(TableName)}
753   *
754   * @param pattern The pattern to match table names against
755   * @return Table descriptors for tables that couldn't be deleted
756   * @throws IOException
757   */
758  @Override
759  public HTableDescriptor[] deleteTables(Pattern pattern) throws IOException {
760    List<HTableDescriptor> failed = new LinkedList<>();
761    for (HTableDescriptor table : listTables(pattern)) {
762      try {
763        deleteTable(table.getTableName());
764      } catch (IOException ex) {
765        LOG.info("Failed to delete table " + table.getTableName(), ex);
766        failed.add(table);
767      }
768    }
769    return failed.toArray(new HTableDescriptor[failed.size()]);
770  }
771
772  @Override
773  public void truncateTable(final TableName tableName, final boolean preserveSplits)
774      throws IOException {
775    get(truncateTableAsync(tableName, preserveSplits), syncWaitTimeout, TimeUnit.MILLISECONDS);
776  }
777
778  @Override
779  public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
780      throws IOException {
781    TruncateTableResponse response =
782        executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(),
783            getRpcControllerFactory()) {
784          Long nonceGroup = ng.getNonceGroup();
785          Long nonce = ng.newNonce();
786          @Override
787          protected TruncateTableResponse rpcCall() throws Exception {
788            setPriority(tableName);
789            LOG.info("Started truncating " + tableName);
790            TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
791              tableName, preserveSplits, nonceGroup, nonce);
792            return master.truncateTable(getRpcController(), req);
793          }
794        });
795    return new TruncateTableFuture(this, tableName, preserveSplits, response);
796  }
797
798  private static class TruncateTableFuture extends TableFuture<Void> {
799    private final boolean preserveSplits;
800
801    public TruncateTableFuture(final HBaseAdmin admin, final TableName tableName,
802        final boolean preserveSplits, final TruncateTableResponse response) {
803      super(admin, tableName,
804             (response != null && response.hasProcId()) ? response.getProcId() : null);
805      this.preserveSplits = preserveSplits;
806    }
807
808    @Override
809    public String getOperationType() {
810      return "TRUNCATE";
811    }
812
813    @Override
814    protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
815      waitForTableEnabled(deadlineTs);
816      // once the table is enabled, we know the operation is done. so we can fetch the splitKeys
817      byte[][] splitKeys = preserveSplits ? getAdmin().getTableSplits(getTableName()) : null;
818      waitForAllRegionsOnline(deadlineTs, splitKeys);
819      return null;
820    }
821  }
822
823  private byte[][] getTableSplits(final TableName tableName) throws IOException {
824    byte[][] splits = null;
825    try (RegionLocator locator = getConnection().getRegionLocator(tableName)) {
826      byte[][] startKeys = locator.getStartKeys();
827      if (startKeys.length == 1) {
828        return splits;
829      }
830      splits = new byte[startKeys.length - 1][];
831      for (int i = 1; i < startKeys.length; i++) {
832        splits[i - 1] = startKeys[i];
833      }
834    }
835    return splits;
836  }
837
838  @Override
839  public void enableTable(final TableName tableName)
840  throws IOException {
841    get(enableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
842  }
843
844  @Override
845  public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
846    TableName.isLegalFullyQualifiedTableName(tableName.getName());
847    EnableTableResponse response = executeCallable(
848      new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) {
849        Long nonceGroup = ng.getNonceGroup();
850        Long nonce = ng.newNonce();
851        @Override
852        protected EnableTableResponse rpcCall() throws Exception {
853          setPriority(tableName);
854          LOG.info("Started enable of " + tableName);
855          EnableTableRequest req =
856              RequestConverter.buildEnableTableRequest(tableName, nonceGroup, nonce);
857          return master.enableTable(getRpcController(),req);
858        }
859      });
860    return new EnableTableFuture(this, tableName, response);
861  }
862
863  private static class EnableTableFuture extends TableFuture<Void> {
864    public EnableTableFuture(final HBaseAdmin admin, final TableName tableName,
865        final EnableTableResponse response) {
866      super(admin, tableName,
867              (response != null && response.hasProcId()) ? response.getProcId() : null);
868    }
869
870    @Override
871    public String getOperationType() {
872      return "ENABLE";
873    }
874
875    @Override
876    protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException {
877      waitForTableEnabled(deadlineTs);
878      return null;
879    }
880  }
881
882  @Override
883  public HTableDescriptor[] enableTables(String regex) throws IOException {
884    return enableTables(Pattern.compile(regex));
885  }
886
887  @Override
888  public HTableDescriptor[] enableTables(Pattern pattern) throws IOException {
889    List<HTableDescriptor> failed = new LinkedList<>();
890    for (HTableDescriptor table : listTables(pattern)) {
891      if (isTableDisabled(table.getTableName())) {
892        try {
893          enableTable(table.getTableName());
894        } catch (IOException ex) {
895          LOG.info("Failed to enable table " + table.getTableName(), ex);
896          failed.add(table);
897        }
898      }
899    }
900    return failed.toArray(new HTableDescriptor[failed.size()]);
901  }
902
903  @Override
904  public void disableTable(final TableName tableName)
905  throws IOException {
906    get(disableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
907  }
908
909  @Override
910  public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
911    TableName.isLegalFullyQualifiedTableName(tableName.getName());
912    DisableTableResponse response = executeCallable(
913      new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) {
914        Long nonceGroup = ng.getNonceGroup();
915        Long nonce = ng.newNonce();
916        @Override
917        protected DisableTableResponse rpcCall() throws Exception {
918          setPriority(tableName);
919          LOG.info("Started disable of " + tableName);
920          DisableTableRequest req =
921              RequestConverter.buildDisableTableRequest(
922                tableName, nonceGroup, nonce);
923          return master.disableTable(getRpcController(), req);
924        }
925      });
926    return new DisableTableFuture(this, tableName, response);
927  }
928
929  private static class DisableTableFuture extends TableFuture<Void> {
930    public DisableTableFuture(final HBaseAdmin admin, final TableName tableName,
931        final DisableTableResponse response) {
932      super(admin, tableName,
933              (response != null && response.hasProcId()) ? response.getProcId() : null);
934    }
935
936    @Override
937    public String getOperationType() {
938      return "DISABLE";
939    }
940
941    @Override
942    protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException {
943      waitForTableDisabled(deadlineTs);
944      return null;
945    }
946  }
947
948  @Override
949  public HTableDescriptor[] disableTables(String regex) throws IOException {
950    return disableTables(Pattern.compile(regex));
951  }
952
953  @Override
954  public HTableDescriptor[] disableTables(Pattern pattern) throws IOException {
955    List<HTableDescriptor> failed = new LinkedList<>();
956    for (HTableDescriptor table : listTables(pattern)) {
957      if (isTableEnabled(table.getTableName())) {
958        try {
959          disableTable(table.getTableName());
960        } catch (IOException ex) {
961          LOG.info("Failed to disable table " + table.getTableName(), ex);
962          failed.add(table);
963        }
964      }
965    }
966    return failed.toArray(new HTableDescriptor[failed.size()]);
967  }
968
969  @Override
970  public boolean isTableEnabled(final TableName tableName) throws IOException {
971    checkTableExists(tableName);
972    return executeCallable(new RpcRetryingCallable<Boolean>() {
973      @Override
974      protected Boolean rpcCall(int callTimeout) throws Exception {
975        TableState tableState = MetaTableAccessor.getTableState(getConnection(), tableName);
976        if (tableState == null) {
977          throw new TableNotFoundException(tableName);
978        }
979        return tableState.inStates(TableState.State.ENABLED);
980      }
981    });
982  }
983
984  @Override
985  public boolean isTableDisabled(TableName tableName) throws IOException {
986    checkTableExists(tableName);
987    return connection.isTableDisabled(tableName);
988  }
989
990  @Override
991  public boolean isTableAvailable(TableName tableName) throws IOException {
992    return connection.isTableAvailable(tableName, null);
993  }
994
995  @Override
996  public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
997    return connection.isTableAvailable(tableName, splitKeys);
998  }
999
1000  @Override
1001  public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException {
1002    return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(),
1003        getRpcControllerFactory()) {
1004      @Override
1005      protected Pair<Integer, Integer> rpcCall() throws Exception {
1006        setPriority(tableName);
1007        GetSchemaAlterStatusRequest req = RequestConverter
1008            .buildGetSchemaAlterStatusRequest(tableName);
1009        GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(getRpcController(), req);
1010        Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
1011            ret.getTotalRegions());
1012        return pair;
1013      }
1014    });
1015  }
1016
1017  @Override
1018  public Pair<Integer, Integer> getAlterStatus(final byte[] tableName) throws IOException {
1019    return getAlterStatus(TableName.valueOf(tableName));
1020  }
1021
1022  @Override
1023  public void addColumnFamily(final TableName tableName, final ColumnFamilyDescriptor columnFamily)
1024      throws IOException {
1025    get(addColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
1026  }
1027
1028  @Override
1029  public Future<Void> addColumnFamilyAsync(final TableName tableName,
1030      final ColumnFamilyDescriptor columnFamily) throws IOException {
1031    AddColumnResponse response =
1032        executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
1033            getRpcControllerFactory()) {
1034          Long nonceGroup = ng.getNonceGroup();
1035          Long nonce = ng.newNonce();
1036          @Override
1037          protected AddColumnResponse rpcCall() throws Exception {
1038            setPriority(tableName);
1039            AddColumnRequest req =
1040                RequestConverter.buildAddColumnRequest(tableName, columnFamily, nonceGroup, nonce);
1041            return master.addColumn(getRpcController(), req);
1042          }
1043        });
1044    return new AddColumnFamilyFuture(this, tableName, response);
1045  }
1046
1047  private static class AddColumnFamilyFuture extends ModifyTableFuture {
1048    public AddColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
1049        final AddColumnResponse response) {
1050      super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
1051          : null);
1052    }
1053
1054    @Override
1055    public String getOperationType() {
1056      return "ADD_COLUMN_FAMILY";
1057    }
1058  }
1059
1060  /**
1061   * {@inheritDoc}
1062   * @deprecated Since 2.0. Will be removed in 3.0. Use
1063   *     {@link #deleteColumnFamily(TableName, byte[])} instead.
1064   */
1065  @Override
1066  @Deprecated
1067  public void deleteColumn(final TableName tableName, final byte[] columnFamily)
1068  throws IOException {
1069    deleteColumnFamily(tableName, columnFamily);
1070  }
1071
1072  @Override
1073  public void deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
1074      throws IOException {
1075    get(deleteColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
1076  }
1077
1078  @Override
1079  public Future<Void> deleteColumnFamilyAsync(final TableName tableName, final byte[] columnFamily)
1080      throws IOException {
1081    DeleteColumnResponse response =
1082        executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(),
1083            getRpcControllerFactory()) {
1084          Long nonceGroup = ng.getNonceGroup();
1085          Long nonce = ng.newNonce();
1086          @Override
1087          protected DeleteColumnResponse rpcCall() throws Exception {
1088            setPriority(tableName);
1089            DeleteColumnRequest req =
1090                RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
1091                  nonceGroup, nonce);
1092            return master.deleteColumn(getRpcController(), req);
1093          }
1094        });
1095    return new DeleteColumnFamilyFuture(this, tableName, response);
1096  }
1097
1098  private static class DeleteColumnFamilyFuture extends ModifyTableFuture {
1099    public DeleteColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
1100        final DeleteColumnResponse response) {
1101      super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
1102          : null);
1103    }
1104
1105    @Override
1106    public String getOperationType() {
1107      return "DELETE_COLUMN_FAMILY";
1108    }
1109  }
1110
1111  @Override
1112  public void modifyColumnFamily(final TableName tableName,
1113      final ColumnFamilyDescriptor columnFamily) throws IOException {
1114    get(modifyColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
1115  }
1116
1117  @Override
1118  public Future<Void> modifyColumnFamilyAsync(final TableName tableName,
1119      final ColumnFamilyDescriptor columnFamily) throws IOException {
1120    ModifyColumnResponse response =
1121        executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(),
1122            getRpcControllerFactory()) {
1123          Long nonceGroup = ng.getNonceGroup();
1124          Long nonce = ng.newNonce();
1125          @Override
1126          protected ModifyColumnResponse rpcCall() throws Exception {
1127            setPriority(tableName);
1128            ModifyColumnRequest req =
1129                RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
1130                  nonceGroup, nonce);
1131            return master.modifyColumn(getRpcController(), req);
1132          }
1133        });
1134    return new ModifyColumnFamilyFuture(this, tableName, response);
1135  }
1136
1137  private static class ModifyColumnFamilyFuture extends ModifyTableFuture {
1138    public ModifyColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName,
1139        final ModifyColumnResponse response) {
1140      super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId()
1141          : null);
1142    }
1143
1144    @Override
1145    public String getOperationType() {
1146      return "MODIFY_COLUMN_FAMILY";
1147    }
1148  }
1149
1150  @Deprecated
1151  @Override
1152  public void closeRegion(final String regionName, final String unused) throws IOException {
1153    unassign(Bytes.toBytes(regionName), true);
1154  }
1155
1156  @Deprecated
1157  @Override
1158  public void closeRegion(final byte [] regionName, final String unused) throws IOException {
1159    unassign(regionName, true);
1160  }
1161
1162  @Deprecated
1163  @Override
1164  public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
1165      final String unused) throws IOException {
1166    unassign(Bytes.toBytes(encodedRegionName), true);
1167    return true;
1168  }
1169
1170  @Deprecated
1171  @Override
1172  public void closeRegion(final ServerName unused, final HRegionInfo hri) throws IOException {
1173    unassign(hri.getRegionName(), true);
1174  }
1175
1176  /**
1177   * @param sn
1178   * @return List of {@link HRegionInfo}.
1179   * @throws IOException
1180   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
1181   *             Use {@link #getRegions(ServerName)}.
1182   */
1183  @Deprecated
1184  @Override
1185  public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
1186    return getRegions(sn).stream().map(ImmutableHRegionInfo::new).collect(Collectors.toList());
1187  }
1188
1189  @Override
1190  public void flush(final TableName tableName) throws IOException {
1191    checkTableExists(tableName);
1192    if (isTableDisabled(tableName)) {
1193      LOG.info("Table is disabled: " + tableName.getNameAsString());
1194      return;
1195    }
1196    execProcedure("flush-table-proc", tableName.getNameAsString(), new HashMap<>());
1197  }
1198
1199  @Override
1200  public void flushRegion(final byte[] regionName) throws IOException {
1201    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
1202    if (regionServerPair == null) {
1203      throw new IllegalArgumentException("Unknown regionname: " + Bytes.toStringBinary(regionName));
1204    }
1205    if (regionServerPair.getSecond() == null) {
1206      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1207    }
1208    final RegionInfo regionInfo = regionServerPair.getFirst();
1209    ServerName serverName = regionServerPair.getSecond();
1210    flush(this.connection.getAdmin(serverName), regionInfo);
1211  }
1212
1213  private void flush(AdminService.BlockingInterface admin, final RegionInfo info)
1214    throws IOException {
1215    ProtobufUtil.call(() -> {
1216      // TODO: There is no timeout on this controller. Set one!
1217      HBaseRpcController controller = rpcControllerFactory.newController();
1218      FlushRegionRequest request =
1219        RequestConverter.buildFlushRegionRequest(info.getRegionName());
1220      admin.flushRegion(controller, request);
1221      return null;
1222    });
1223  }
1224
1225  @Override
1226  public void flushRegionServer(ServerName serverName) throws IOException {
1227    for (RegionInfo region : getRegions(serverName)) {
1228      flush(this.connection.getAdmin(serverName), region);
1229    }
1230  }
1231
1232  /**
1233   * {@inheritDoc}
1234   */
1235  @Override
1236  public void compact(final TableName tableName)
1237    throws IOException {
1238    compact(tableName, null, false, CompactType.NORMAL);
1239  }
1240
1241  @Override
1242  public void compactRegion(final byte[] regionName)
1243    throws IOException {
1244    compactRegion(regionName, null, false);
1245  }
1246
1247  /**
1248   * {@inheritDoc}
1249   */
1250  @Override
1251  public void compact(final TableName tableName, final byte[] columnFamily)
1252    throws IOException {
1253    compact(tableName, columnFamily, false, CompactType.NORMAL);
1254  }
1255
1256  /**
1257   * {@inheritDoc}
1258   */
1259  @Override
1260  public void compactRegion(final byte[] regionName, final byte[] columnFamily)
1261    throws IOException {
1262    compactRegion(regionName, columnFamily, false);
1263  }
1264
1265  @Override
1266  public void compactRegionServer(final ServerName serverName) throws IOException {
1267    for (RegionInfo region : getRegions(serverName)) {
1268      compact(this.connection.getAdmin(serverName), region, false, null);
1269    }
1270  }
1271
1272  @Override
1273  public void majorCompactRegionServer(final ServerName serverName) throws IOException {
1274    for (RegionInfo region : getRegions(serverName)) {
1275      compact(this.connection.getAdmin(serverName), region, true, null);
1276    }
1277  }
1278
1279  @Override
1280  public void majorCompact(final TableName tableName)
1281  throws IOException {
1282    compact(tableName, null, true, CompactType.NORMAL);
1283  }
1284
1285  @Override
1286  public void majorCompactRegion(final byte[] regionName)
1287  throws IOException {
1288    compactRegion(regionName, null, true);
1289  }
1290
1291  /**
1292   * {@inheritDoc}
1293   */
1294  @Override
1295  public void majorCompact(final TableName tableName, final byte[] columnFamily)
1296  throws IOException {
1297    compact(tableName, columnFamily, true, CompactType.NORMAL);
1298  }
1299
1300  @Override
1301  public void majorCompactRegion(final byte[] regionName, final byte[] columnFamily)
1302  throws IOException {
1303    compactRegion(regionName, columnFamily, true);
1304  }
1305
1306  /**
1307   * Compact a table.
1308   * Asynchronous operation.
1309   *
1310   * @param tableName table or region to compact
1311   * @param columnFamily column family within a table or region
1312   * @param major True if we are to do a major compaction.
1313   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
1314   * @throws IOException if a remote or network exception occurs
1315   */
1316  private void compact(final TableName tableName, final byte[] columnFamily,final boolean major,
1317                       CompactType compactType) throws IOException {
1318    switch (compactType) {
1319      case MOB:
1320        compact(this.connection.getAdminForMaster(), RegionInfo.createMobRegionInfo(tableName),
1321            major, columnFamily);
1322        break;
1323      case NORMAL:
1324        checkTableExists(tableName);
1325        for (HRegionLocation loc :connection.locateRegions(tableName, false, false)) {
1326          ServerName sn = loc.getServerName();
1327          if (sn == null) {
1328            continue;
1329          }
1330          try {
1331            compact(this.connection.getAdmin(sn), loc.getRegion(), major, columnFamily);
1332          } catch (NotServingRegionException e) {
1333            if (LOG.isDebugEnabled()) {
1334              LOG.debug("Trying to" + (major ? " major" : "") + " compact " + loc.getRegion() +
1335                  ": " + StringUtils.stringifyException(e));
1336            }
1337          }
1338        }
1339        break;
1340      default:
1341        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1342    }
1343  }
1344
1345  /**
1346   * Compact an individual region.
1347   * Asynchronous operation.
1348   *
1349   * @param regionName region to compact
1350   * @param columnFamily column family within a table or region
1351   * @param major True if we are to do a major compaction.
1352   * @throws IOException if a remote or network exception occurs
1353   * @throws InterruptedException
1354   */
1355  private void compactRegion(final byte[] regionName, final byte[] columnFamily,
1356      final boolean major) throws IOException {
1357    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
1358    if (regionServerPair == null) {
1359      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1360    }
1361    if (regionServerPair.getSecond() == null) {
1362      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1363    }
1364    compact(this.connection.getAdmin(regionServerPair.getSecond()), regionServerPair.getFirst(),
1365      major, columnFamily);
1366  }
1367
1368  private void compact(AdminService.BlockingInterface admin, RegionInfo hri, boolean major,
1369      byte[] family) throws IOException {
1370    Callable<Void> callable = new Callable<Void>() {
1371      @Override
1372      public Void call() throws Exception {
1373        // TODO: There is no timeout on this controller. Set one!
1374        HBaseRpcController controller = rpcControllerFactory.newController();
1375        CompactRegionRequest request =
1376            RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
1377        admin.compactRegion(controller, request);
1378        return null;
1379      }
1380    };
1381    ProtobufUtil.call(callable);
1382  }
1383
1384  @Override
1385  public void move(final byte[] encodedRegionName, final byte[] destServerName) throws IOException {
1386    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1387      @Override
1388      protected Void rpcCall() throws Exception {
1389        setPriority(encodedRegionName);
1390        MoveRegionRequest request =
1391            RequestConverter.buildMoveRegionRequest(encodedRegionName,
1392              destServerName != null ? ServerName.valueOf(Bytes.toString(destServerName)) : null);
1393        master.moveRegion(getRpcController(), request);
1394        return null;
1395      }
1396    });
1397  }
1398
1399  @Override
1400  public void assign(final byte [] regionName) throws MasterNotRunningException,
1401      ZooKeeperConnectionException, IOException {
1402    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1403      @Override
1404      protected Void rpcCall() throws Exception {
1405        setPriority(regionName);
1406        AssignRegionRequest request =
1407            RequestConverter.buildAssignRegionRequest(getRegionName(regionName));
1408        master.assignRegion(getRpcController(), request);
1409        return null;
1410      }
1411    });
1412  }
1413
1414  @Override
1415  public void unassign(final byte [] regionName, final boolean force) throws IOException {
1416    final byte[] toBeUnassigned = getRegionName(regionName);
1417    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1418      @Override
1419      protected Void rpcCall() throws Exception {
1420        setPriority(regionName);
1421        UnassignRegionRequest request =
1422            RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
1423        master.unassignRegion(getRpcController(), request);
1424        return null;
1425      }
1426    });
1427  }
1428
1429  @Override
1430  public void offline(final byte [] regionName)
1431  throws IOException {
1432    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
1433      @Override
1434      protected Void rpcCall() throws Exception {
1435        setPriority(regionName);
1436        master.offlineRegion(getRpcController(),
1437            RequestConverter.buildOfflineRegionRequest(regionName));
1438        return null;
1439      }
1440    });
1441  }
1442
1443  @Override
1444  public boolean balancerSwitch(final boolean on, final boolean synchronous)
1445  throws IOException {
1446    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1447      @Override
1448      protected Boolean rpcCall() throws Exception {
1449        SetBalancerRunningRequest req =
1450            RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
1451        return master.setBalancerRunning(getRpcController(), req).getPrevBalanceValue();
1452      }
1453    });
1454  }
1455
1456  @Override
1457  public boolean balance() throws IOException {
1458    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1459      @Override
1460      protected Boolean rpcCall() throws Exception {
1461        return master.balance(getRpcController(),
1462            RequestConverter.buildBalanceRequest(false)).getBalancerRan();
1463      }
1464    });
1465  }
1466
1467  @Override
1468  public boolean balance(final boolean force) throws IOException {
1469    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1470      @Override
1471      protected Boolean rpcCall() throws Exception {
1472        return master.balance(getRpcController(),
1473            RequestConverter.buildBalanceRequest(force)).getBalancerRan();
1474      }
1475    });
1476  }
1477
1478  @Override
1479  public boolean isBalancerEnabled() throws IOException {
1480    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1481      @Override
1482      protected Boolean rpcCall() throws Exception {
1483        return master.isBalancerEnabled(getRpcController(),
1484          RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
1485      }
1486    });
1487  }
1488
1489  /**
1490   * {@inheritDoc}
1491   */
1492  @Override
1493  public CacheEvictionStats clearBlockCache(final TableName tableName) throws IOException {
1494    checkTableExists(tableName);
1495    CacheEvictionStatsBuilder cacheEvictionStats = CacheEvictionStats.builder();
1496    List<Pair<RegionInfo, ServerName>> pairs =
1497      MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
1498    Map<ServerName, List<RegionInfo>> regionInfoByServerName =
1499        pairs.stream()
1500            .filter(pair -> !(pair.getFirst().isOffline()))
1501            .filter(pair -> pair.getSecond() != null)
1502            .collect(Collectors.groupingBy(pair -> pair.getSecond(),
1503                Collectors.mapping(pair -> pair.getFirst(), Collectors.toList())));
1504
1505    for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
1506      CacheEvictionStats stats = clearBlockCache(entry.getKey(), entry.getValue());
1507      cacheEvictionStats = cacheEvictionStats.append(stats);
1508      if (stats.getExceptionCount() > 0) {
1509        for (Map.Entry<byte[], Throwable> exception : stats.getExceptions().entrySet()) {
1510          LOG.debug("Failed to clear block cache for "
1511              + Bytes.toStringBinary(exception.getKey())
1512              + " on " + entry.getKey() + ": ", exception.getValue());
1513        }
1514      }
1515    }
1516    return cacheEvictionStats.build();
1517  }
1518
1519  private CacheEvictionStats clearBlockCache(final ServerName sn, final List<RegionInfo> hris)
1520      throws IOException {
1521    HBaseRpcController controller = rpcControllerFactory.newController();
1522    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
1523    ClearRegionBlockCacheRequest request =
1524      RequestConverter.buildClearRegionBlockCacheRequest(hris);
1525    ClearRegionBlockCacheResponse response;
1526    try {
1527      response = admin.clearRegionBlockCache(controller, request);
1528      return ProtobufUtil.toCacheEvictionStats(response.getStats());
1529    } catch (ServiceException se) {
1530      throw ProtobufUtil.getRemoteException(se);
1531    }
1532  }
1533
1534  /**
1535   * Invoke region normalizer. Can NOT run for various reasons.  Check logs.
1536   *
1537   * @return True if region normalizer ran, false otherwise.
1538   */
1539  @Override
1540  public boolean normalize() throws IOException {
1541    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1542      @Override
1543      protected Boolean rpcCall() throws Exception {
1544        return master.normalize(getRpcController(),
1545            RequestConverter.buildNormalizeRequest()).getNormalizerRan();
1546      }
1547    });
1548  }
1549
1550  @Override
1551  public boolean isNormalizerEnabled() throws IOException {
1552    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1553      @Override
1554      protected Boolean rpcCall() throws Exception {
1555        return master.isNormalizerEnabled(getRpcController(),
1556          RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
1557      }
1558    });
1559  }
1560
1561  @Override
1562  public boolean normalizerSwitch(final boolean on) throws IOException {
1563    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1564      @Override
1565      protected Boolean rpcCall() throws Exception {
1566        SetNormalizerRunningRequest req =
1567          RequestConverter.buildSetNormalizerRunningRequest(on);
1568        return master.setNormalizerRunning(getRpcController(), req).getPrevNormalizerValue();
1569      }
1570    });
1571  }
1572
1573  @Override
1574  public boolean catalogJanitorSwitch(final boolean enable) throws IOException {
1575    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1576      @Override
1577      protected Boolean rpcCall() throws Exception {
1578        return master.enableCatalogJanitor(getRpcController(),
1579          RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
1580      }
1581    });
1582  }
1583
1584  @Override
1585  public int runCatalogJanitor() throws IOException {
1586    return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) {
1587      @Override
1588      protected Integer rpcCall() throws Exception {
1589        return master.runCatalogScan(getRpcController(),
1590          RequestConverter.buildCatalogScanRequest()).getScanResult();
1591      }
1592    });
1593  }
1594
1595  @Override
1596  public boolean isCatalogJanitorEnabled() throws IOException {
1597    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1598      @Override
1599      protected Boolean rpcCall() throws Exception {
1600        return master.isCatalogJanitorEnabled(getRpcController(),
1601          RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
1602      }
1603    });
1604  }
1605
1606  @Override
1607  public boolean cleanerChoreSwitch(final boolean on) throws IOException {
1608    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1609      @Override public Boolean rpcCall() throws Exception {
1610        return master.setCleanerChoreRunning(getRpcController(),
1611            RequestConverter.buildSetCleanerChoreRunningRequest(on)).getPrevValue();
1612      }
1613    });
1614  }
1615
1616  @Override
1617  public boolean runCleanerChore() throws IOException {
1618    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1619      @Override public Boolean rpcCall() throws Exception {
1620        return master.runCleanerChore(getRpcController(),
1621            RequestConverter.buildRunCleanerChoreRequest()).getCleanerChoreRan();
1622      }
1623    });
1624  }
1625
1626  @Override
1627  public boolean isCleanerChoreEnabled() throws IOException {
1628    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
1629      @Override public Boolean rpcCall() throws Exception {
1630        return master.isCleanerChoreEnabled(getRpcController(),
1631            RequestConverter.buildIsCleanerChoreEnabledRequest()).getValue();
1632      }
1633    });
1634  }
1635
1636  /**
1637   * Merge two regions. Synchronous operation.
1638   * Note: It is not feasible to predict the length of merge.
1639   *   Therefore, this is for internal testing only.
1640   * @param nameOfRegionA encoded or full name of region a
1641   * @param nameOfRegionB encoded or full name of region b
1642   * @param forcible true if do a compulsory merge, otherwise we will only merge
1643   *          two adjacent regions
1644   * @throws IOException
1645   */
1646  @VisibleForTesting
1647  public void mergeRegionsSync(
1648      final byte[] nameOfRegionA,
1649      final byte[] nameOfRegionB,
1650      final boolean forcible) throws IOException {
1651    get(
1652      mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible),
1653      syncWaitTimeout,
1654      TimeUnit.MILLISECONDS);
1655  }
1656
1657  /**
1658   * Merge two regions. Asynchronous operation.
1659   * @param nameOfRegionA encoded or full name of region a
1660   * @param nameOfRegionB encoded or full name of region b
1661   * @param forcible true if do a compulsory merge, otherwise we will only merge
1662   *          two adjacent regions
1663   * @throws IOException
1664   * @deprecated Since 2.0. Will be removed in 3.0. Use
1665   *     {@link #mergeRegionsAsync(byte[], byte[], boolean)} instead.
1666   */
1667  @Deprecated
1668  @Override
1669  public void mergeRegions(final byte[] nameOfRegionA,
1670      final byte[] nameOfRegionB, final boolean forcible)
1671      throws IOException {
1672    mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible);
1673  }
1674
1675  /**
1676   * Merge two regions. Asynchronous operation.
1677   * @param nameOfRegionA encoded or full name of region a
1678   * @param nameOfRegionB encoded or full name of region b
1679   * @param forcible true if do a compulsory merge, otherwise we will only merge
1680   *          two adjacent regions
1681   * @throws IOException
1682   */
1683  @Override
1684  public Future<Void> mergeRegionsAsync(
1685      final byte[] nameOfRegionA,
1686      final byte[] nameOfRegionB,
1687      final boolean forcible) throws IOException {
1688    byte[][] nameofRegionsToMerge = new byte[2][];
1689    nameofRegionsToMerge[0] = nameOfRegionA;
1690    nameofRegionsToMerge[1] = nameOfRegionB;
1691    return mergeRegionsAsync(nameofRegionsToMerge, forcible);
1692  }
1693
1694  /**
1695   * Merge two regions. Asynchronous operation.
1696   * @param nameofRegionsToMerge encoded or full name of daughter regions
1697   * @param forcible true if do a compulsory merge, otherwise we will only merge
1698   *          adjacent regions
1699   * @throws IOException
1700   */
1701  @Override
1702  public Future<Void> mergeRegionsAsync(
1703      final byte[][] nameofRegionsToMerge,
1704      final boolean forcible) throws IOException {
1705    assert(nameofRegionsToMerge.length >= 2);
1706    byte[][] encodedNameofRegionsToMerge = new byte[nameofRegionsToMerge.length][];
1707    for(int i = 0; i < nameofRegionsToMerge.length; i++) {
1708      encodedNameofRegionsToMerge[i] = HRegionInfo.isEncodedRegionName(nameofRegionsToMerge[i]) ?
1709          nameofRegionsToMerge[i] :
1710          Bytes.toBytes(HRegionInfo.encodeRegionName(nameofRegionsToMerge[i]));
1711    }
1712
1713    TableName tableName = null;
1714    Pair<RegionInfo, ServerName> pair;
1715
1716    for(int i = 0; i < nameofRegionsToMerge.length; i++) {
1717      pair = getRegion(nameofRegionsToMerge[i]);
1718
1719      if (pair != null) {
1720        if (pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1721          throw new IllegalArgumentException ("Can't invoke merge on non-default regions directly");
1722        }
1723        if (tableName == null) {
1724          tableName = pair.getFirst().getTable();
1725        } else  if (!tableName.equals(pair.getFirst().getTable())) {
1726          throw new IllegalArgumentException ("Cannot merge regions from two different tables " +
1727              tableName + " and " + pair.getFirst().getTable());
1728        }
1729      } else {
1730        throw new UnknownRegionException (
1731          "Can't invoke merge on unknown region "
1732          + Bytes.toStringBinary(encodedNameofRegionsToMerge[i]));
1733      }
1734    }
1735
1736    MergeTableRegionsResponse response =
1737        executeCallable(new MasterCallable<MergeTableRegionsResponse>(getConnection(),
1738            getRpcControllerFactory()) {
1739          Long nonceGroup = ng.getNonceGroup();
1740          Long nonce = ng.newNonce();
1741      @Override
1742      protected MergeTableRegionsResponse rpcCall() throws Exception {
1743        MergeTableRegionsRequest request = RequestConverter
1744            .buildMergeTableRegionsRequest(
1745                encodedNameofRegionsToMerge,
1746                forcible,
1747                nonceGroup,
1748                nonce);
1749        return master.mergeTableRegions(getRpcController(), request);
1750      }
1751    });
1752    return new MergeTableRegionsFuture(this, tableName, response);
1753  }
1754
1755  private static class MergeTableRegionsFuture extends TableFuture<Void> {
1756    public MergeTableRegionsFuture(
1757        final HBaseAdmin admin,
1758        final TableName tableName,
1759        final MergeTableRegionsResponse response) {
1760      super(admin, tableName,
1761          (response != null && response.hasProcId()) ? response.getProcId() : null);
1762    }
1763
1764    public MergeTableRegionsFuture(
1765        final HBaseAdmin admin,
1766        final TableName tableName,
1767        final Long procId) {
1768      super(admin, tableName, procId);
1769    }
1770
1771    @Override
1772    public String getOperationType() {
1773      return "MERGE_REGIONS";
1774    }
1775  }
1776  /**
1777   * Split one region. Synchronous operation.
1778   * Note: It is not feasible to predict the length of split.
1779   *   Therefore, this is for internal testing only.
1780   * @param regionName encoded or full name of region
1781   * @param splitPoint key where region splits
1782   * @throws IOException
1783   */
1784  @VisibleForTesting
1785  public void splitRegionSync(byte[] regionName, byte[] splitPoint) throws IOException {
1786    splitRegionSync(regionName, splitPoint, syncWaitTimeout, TimeUnit.MILLISECONDS);
1787  }
1788
1789
1790  /**
1791   * Split one region. Synchronous operation.
1792   * @param regionName region to be split
1793   * @param splitPoint split point
1794   * @param timeout how long to wait on split
1795   * @param units time units
1796   * @throws IOException
1797   */
1798  public void splitRegionSync(byte[] regionName, byte[] splitPoint,
1799    final long timeout, final TimeUnit units) throws IOException {
1800    get(
1801        splitRegionAsync(regionName, splitPoint),
1802        timeout,
1803        units);
1804  }
1805
1806  @Override
1807  public Future<Void> splitRegionAsync(byte[] regionName, byte[] splitPoint)
1808      throws IOException {
1809    byte[] encodedNameofRegionToSplit = HRegionInfo.isEncodedRegionName(regionName) ?
1810        regionName : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
1811    Pair<RegionInfo, ServerName> pair = getRegion(regionName);
1812    if (pair != null) {
1813      if (pair.getFirst() != null &&
1814          pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1815        throw new IllegalArgumentException ("Can't invoke split on non-default regions directly");
1816      }
1817    } else {
1818      throw new UnknownRegionException (
1819          "Can't invoke merge on unknown region "
1820              + Bytes.toStringBinary(encodedNameofRegionToSplit));
1821    }
1822
1823    return splitRegionAsync(pair.getFirst(), splitPoint);
1824  }
1825
1826  Future<Void> splitRegionAsync(RegionInfo hri, byte[] splitPoint) throws IOException {
1827    TableName tableName = hri.getTable();
1828    if (hri.getStartKey() != null && splitPoint != null &&
1829        Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
1830      throw new IOException("should not give a splitkey which equals to startkey!");
1831    }
1832
1833    SplitTableRegionResponse response = executeCallable(
1834        new MasterCallable<SplitTableRegionResponse>(getConnection(), getRpcControllerFactory()) {
1835          Long nonceGroup = ng.getNonceGroup();
1836          Long nonce = ng.newNonce();
1837          @Override
1838          protected SplitTableRegionResponse rpcCall() throws Exception {
1839            setPriority(tableName);
1840            SplitTableRegionRequest request = RequestConverter
1841                .buildSplitTableRegionRequest(hri, splitPoint, nonceGroup, nonce);
1842            return master.splitRegion(getRpcController(), request);
1843          }
1844        });
1845    return new SplitTableRegionFuture(this, tableName, response);
1846  }
1847
1848  private static class SplitTableRegionFuture extends TableFuture<Void> {
1849    public SplitTableRegionFuture(final HBaseAdmin admin,
1850        final TableName tableName,
1851        final SplitTableRegionResponse response) {
1852      super(admin, tableName,
1853          (response != null && response.hasProcId()) ? response.getProcId() : null);
1854    }
1855
1856    public SplitTableRegionFuture(
1857        final HBaseAdmin admin,
1858        final TableName tableName,
1859        final Long procId) {
1860      super(admin, tableName, procId);
1861    }
1862
1863    @Override
1864    public String getOperationType() {
1865      return "SPLIT_REGION";
1866    }
1867  }
1868
1869  @Override
1870  public void split(final TableName tableName) throws IOException {
1871    split(tableName, null);
1872  }
1873
1874  @Override
1875  public void splitRegion(final byte[] regionName) throws IOException {
1876    splitRegion(regionName, null);
1877  }
1878
1879  @Override
1880  public void split(final TableName tableName, final byte[] splitPoint) throws IOException {
1881    checkTableExists(tableName);
1882    for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) {
1883      ServerName sn = loc.getServerName();
1884      if (sn == null) {
1885        continue;
1886      }
1887      RegionInfo r = loc.getRegion();
1888      // check for parents
1889      if (r.isSplitParent()) {
1890        continue;
1891      }
1892      // if a split point given, only split that particular region
1893      if (r.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID ||
1894          (splitPoint != null && !r.containsRow(splitPoint))) {
1895        continue;
1896      }
1897      // call out to master to do split now
1898      splitRegionAsync(r, splitPoint);
1899    }
1900  }
1901
1902  @Override
1903  public void splitRegion(final byte[] regionName, final byte [] splitPoint) throws IOException {
1904    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
1905    if (regionServerPair == null) {
1906      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
1907    }
1908    if (regionServerPair.getFirst() != null &&
1909        regionServerPair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1910      throw new IllegalArgumentException("Can't split replicas directly. "
1911          + "Replicas are auto-split when their primary is split.");
1912    }
1913    if (regionServerPair.getSecond() == null) {
1914      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
1915    }
1916    splitRegionAsync(regionServerPair.getFirst(), splitPoint);
1917  }
1918
1919  @Override
1920  public void modifyTable(final TableName tableName, final TableDescriptor td)
1921      throws IOException {
1922    get(modifyTableAsync(tableName, td), syncWaitTimeout, TimeUnit.MILLISECONDS);
1923  }
1924
1925  @Override
1926  public Future<Void> modifyTableAsync(final TableName tableName, final TableDescriptor td)
1927      throws IOException {
1928    if (!tableName.equals(td.getTableName())) {
1929      throw new IllegalArgumentException("the specified table name '" + tableName +
1930        "' doesn't match with the HTD one: " + td.getTableName());
1931    }
1932    return modifyTableAsync(td);
1933  }
1934
1935  private static class ModifyTableFuture extends TableFuture<Void> {
1936    public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName,
1937        final ModifyTableResponse response) {
1938      super(admin, tableName,
1939          (response != null && response.hasProcId()) ? response.getProcId() : null);
1940    }
1941
1942    public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
1943      super(admin, tableName, procId);
1944    }
1945
1946    @Override
1947    public String getOperationType() {
1948      return "MODIFY";
1949    }
1950
1951    @Override
1952    protected Void postOperationResult(final Void result, final long deadlineTs)
1953        throws IOException, TimeoutException {
1954      // The modify operation on the table is asynchronous on the server side irrespective
1955      // of whether Procedure V2 is supported or not. So, we wait in the client till
1956      // all regions get updated.
1957      waitForSchemaUpdate(deadlineTs);
1958      return result;
1959    }
1960  }
1961
1962  /**
1963   * @param regionName Name of a region.
1964   * @return a pair of HRegionInfo and ServerName if <code>regionName</code> is
1965   *  a verified region name (we call {@link
1966   *  MetaTableAccessor#getRegionLocation(Connection, byte[])}
1967   *  else null.
1968   * Throw IllegalArgumentException if <code>regionName</code> is null.
1969   * @throws IOException
1970   */
1971  Pair<RegionInfo, ServerName> getRegion(final byte[] regionName) throws IOException {
1972    if (regionName == null) {
1973      throw new IllegalArgumentException("Pass a table name or region name");
1974    }
1975    Pair<RegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionName);
1976    if (pair == null) {
1977      final AtomicReference<Pair<RegionInfo, ServerName>> result = new AtomicReference<>(null);
1978      final String encodedName = Bytes.toString(regionName);
1979      MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
1980        @Override
1981        public boolean visit(Result data) throws IOException {
1982          RegionInfo info = MetaTableAccessor.getRegionInfo(data);
1983          if (info == null) {
1984            LOG.warn("No serialized HRegionInfo in " + data);
1985            return true;
1986          }
1987          RegionLocations rl = MetaTableAccessor.getRegionLocations(data);
1988          boolean matched = false;
1989          ServerName sn = null;
1990          if (rl != null) {
1991            for (HRegionLocation h : rl.getRegionLocations()) {
1992              if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) {
1993                sn = h.getServerName();
1994                info = h.getRegionInfo();
1995                matched = true;
1996              }
1997            }
1998          }
1999          if (!matched) return true;
2000          result.set(new Pair<>(info, sn));
2001          return false; // found the region, stop
2002        }
2003      };
2004
2005      MetaTableAccessor.fullScanRegions(connection, visitor);
2006      pair = result.get();
2007    }
2008    return pair;
2009  }
2010
2011  /**
2012   * If the input is a region name, it is returned as is. If it's an
2013   * encoded region name, the corresponding region is found from meta
2014   * and its region name is returned. If we can't find any region in
2015   * meta matching the input as either region name or encoded region
2016   * name, the input is returned as is. We don't throw unknown
2017   * region exception.
2018   */
2019  private byte[] getRegionName(
2020      final byte[] regionNameOrEncodedRegionName) throws IOException {
2021    if (Bytes.equals(regionNameOrEncodedRegionName,
2022        HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
2023          || Bytes.equals(regionNameOrEncodedRegionName,
2024            HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2025      return HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
2026    }
2027    byte[] tmp = regionNameOrEncodedRegionName;
2028    Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionNameOrEncodedRegionName);
2029    if (regionServerPair != null && regionServerPair.getFirst() != null) {
2030      tmp = regionServerPair.getFirst().getRegionName();
2031    }
2032    return tmp;
2033  }
2034
2035  /**
2036   * Check if table exists or not
2037   * @param tableName Name of a table.
2038   * @return tableName instance
2039   * @throws IOException if a remote or network exception occurs.
2040   * @throws TableNotFoundException if table does not exist.
2041   */
2042  private TableName checkTableExists(final TableName tableName)
2043      throws IOException {
2044    return executeCallable(new RpcRetryingCallable<TableName>() {
2045      @Override
2046      protected TableName rpcCall(int callTimeout) throws Exception {
2047        if (!MetaTableAccessor.tableExists(connection, tableName)) {
2048          throw new TableNotFoundException(tableName);
2049        }
2050        return tableName;
2051      }
2052    });
2053  }
2054
2055  @Override
2056  public synchronized void shutdown() throws IOException {
2057    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2058      @Override
2059      protected Void rpcCall() throws Exception {
2060        setPriority(HConstants.HIGH_QOS);
2061        master.shutdown(getRpcController(), ShutdownRequest.newBuilder().build());
2062        return null;
2063      }
2064    });
2065  }
2066
2067  @Override
2068  public synchronized void stopMaster() throws IOException {
2069    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2070      @Override
2071      protected Void rpcCall() throws Exception {
2072        setPriority(HConstants.HIGH_QOS);
2073        master.stopMaster(getRpcController(), StopMasterRequest.newBuilder().build());
2074        return null;
2075      }
2076    });
2077  }
2078
2079  @Override
2080  public synchronized void stopRegionServer(final String hostnamePort)
2081  throws IOException {
2082    String hostname = Addressing.parseHostname(hostnamePort);
2083    int port = Addressing.parsePort(hostnamePort);
2084    final AdminService.BlockingInterface admin =
2085      this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
2086    // TODO: There is no timeout on this controller. Set one!
2087    HBaseRpcController controller = rpcControllerFactory.newController();
2088    controller.setPriority(HConstants.HIGH_QOS);
2089    StopServerRequest request = RequestConverter.buildStopServerRequest(
2090        "Called by admin client " + this.connection.toString());
2091    try {
2092      admin.stopServer(controller, request);
2093    } catch (Exception e) {
2094      throw ProtobufUtil.handleRemoteException(e);
2095    }
2096  }
2097
2098  @Override
2099  public boolean isMasterInMaintenanceMode() throws IOException {
2100    return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(),
2101        this.rpcControllerFactory) {
2102      @Override
2103      protected IsInMaintenanceModeResponse rpcCall() throws Exception {
2104        return master.isMasterInMaintenanceMode(getRpcController(),
2105            IsInMaintenanceModeRequest.newBuilder().build());
2106      }
2107    }).getInMaintenanceMode();
2108  }
2109
2110  @Override
2111  public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException {
2112    return executeCallable(new MasterCallable<ClusterMetrics>(getConnection(),
2113        this.rpcControllerFactory) {
2114      @Override
2115      protected ClusterMetrics rpcCall() throws Exception {
2116        GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(options);
2117        return ClusterMetricsBuilder.toClusterMetrics(
2118          master.getClusterStatus(getRpcController(), req).getClusterStatus());
2119      }
2120    });
2121  }
2122
2123  @Override
2124  public List<RegionMetrics> getRegionMetrics(ServerName serverName, TableName tableName)
2125      throws IOException {
2126    AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
2127    HBaseRpcController controller = rpcControllerFactory.newController();
2128    AdminProtos.GetRegionLoadRequest request =
2129      RequestConverter.buildGetRegionLoadRequest(tableName);
2130    try {
2131      return admin.getRegionLoad(controller, request).getRegionLoadsList().stream()
2132        .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList());
2133    } catch (ServiceException se) {
2134      throw ProtobufUtil.getRemoteException(se);
2135    }
2136  }
2137
2138  @Override
2139  public Configuration getConfiguration() {
2140    return this.conf;
2141  }
2142
2143  /**
2144   * Do a get with a timeout against the passed in <code>future</code>.
2145   */
2146  private static <T> T get(final Future<T> future, final long timeout, final TimeUnit units)
2147  throws IOException {
2148    try {
2149      // TODO: how long should we wait? Spin forever?
2150      return future.get(timeout, units);
2151    } catch (InterruptedException e) {
2152      throw new InterruptedIOException("Interrupt while waiting on " + future);
2153    } catch (TimeoutException e) {
2154      throw new TimeoutIOException(e);
2155    } catch (ExecutionException e) {
2156      if (e.getCause() instanceof IOException) {
2157        throw (IOException)e.getCause();
2158      } else {
2159        throw new IOException(e.getCause());
2160      }
2161    }
2162  }
2163
2164  @Override
2165  public void createNamespace(final NamespaceDescriptor descriptor)
2166  throws IOException {
2167    get(createNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
2168  }
2169
2170  @Override
2171  public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
2172      throws IOException {
2173    CreateNamespaceResponse response =
2174        executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(),
2175            getRpcControllerFactory()) {
2176      @Override
2177      protected CreateNamespaceResponse rpcCall() throws Exception {
2178        return master.createNamespace(getRpcController(),
2179          CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil.
2180              toProtoNamespaceDescriptor(descriptor)).build());
2181      }
2182    });
2183    return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
2184      @Override
2185      public String getOperationType() {
2186        return "CREATE_NAMESPACE";
2187      }
2188    };
2189  }
2190
2191  @Override
2192  public void modifyNamespace(final NamespaceDescriptor descriptor)
2193  throws IOException {
2194    get(modifyNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
2195  }
2196
2197  @Override
2198  public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
2199      throws IOException {
2200    ModifyNamespaceResponse response =
2201        executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(),
2202            getRpcControllerFactory()) {
2203      @Override
2204      protected ModifyNamespaceResponse rpcCall() throws Exception {
2205        // TODO: set priority based on NS?
2206        return master.modifyNamespace(getRpcController(), ModifyNamespaceRequest.newBuilder().
2207          setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
2208       }
2209    });
2210    return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
2211      @Override
2212      public String getOperationType() {
2213        return "MODIFY_NAMESPACE";
2214      }
2215    };
2216  }
2217
2218  @Override
2219  public void deleteNamespace(final String name)
2220  throws IOException {
2221    get(deleteNamespaceAsync(name), this.syncWaitTimeout, TimeUnit.MILLISECONDS);
2222  }
2223
2224  @Override
2225  public Future<Void> deleteNamespaceAsync(final String name)
2226      throws IOException {
2227    DeleteNamespaceResponse response =
2228        executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(),
2229            getRpcControllerFactory()) {
2230      @Override
2231      protected DeleteNamespaceResponse rpcCall() throws Exception {
2232        // TODO: set priority based on NS?
2233        return master.deleteNamespace(getRpcController(), DeleteNamespaceRequest.newBuilder().
2234          setNamespaceName(name).build());
2235        }
2236      });
2237    return new NamespaceFuture(this, name, response.getProcId()) {
2238      @Override
2239      public String getOperationType() {
2240        return "DELETE_NAMESPACE";
2241      }
2242    };
2243  }
2244
2245  @Override
2246  public NamespaceDescriptor getNamespaceDescriptor(final String name)
2247      throws NamespaceNotFoundException, IOException {
2248    return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(),
2249        getRpcControllerFactory()) {
2250      @Override
2251      protected NamespaceDescriptor rpcCall() throws Exception {
2252        return ProtobufUtil.toNamespaceDescriptor(
2253            master.getNamespaceDescriptor(getRpcController(),
2254                GetNamespaceDescriptorRequest.newBuilder().
2255                  setNamespaceName(name).build()).getNamespaceDescriptor());
2256      }
2257    });
2258  }
2259
2260  @Override
2261  public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
2262    return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(),
2263        getRpcControllerFactory()) {
2264      @Override
2265      protected NamespaceDescriptor[] rpcCall() throws Exception {
2266        List<HBaseProtos.NamespaceDescriptor> list =
2267            master.listNamespaceDescriptors(getRpcController(),
2268              ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList();
2269        NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
2270        for(int i = 0; i < list.size(); i++) {
2271          res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
2272        }
2273        return res;
2274      }
2275    });
2276  }
2277
2278  @Override
2279  public String getProcedures() throws IOException {
2280    return executeCallable(new MasterCallable<String>(getConnection(),
2281        getRpcControllerFactory()) {
2282      @Override
2283      protected String rpcCall() throws Exception {
2284        GetProceduresRequest request = GetProceduresRequest.newBuilder().build();
2285        GetProceduresResponse response = master.getProcedures(getRpcController(), request);
2286        return ProtobufUtil.toProcedureJson(response.getProcedureList());
2287      }
2288    });
2289  }
2290
2291  @Override
2292  public String getLocks() throws IOException {
2293    return executeCallable(new MasterCallable<String>(getConnection(),
2294        getRpcControllerFactory()) {
2295      @Override
2296      protected String rpcCall() throws Exception {
2297        GetLocksRequest request = GetLocksRequest.newBuilder().build();
2298        GetLocksResponse response = master.getLocks(getRpcController(), request);
2299        return ProtobufUtil.toLockJson(response.getLockList());
2300      }
2301    });
2302  }
2303
2304  @Override
2305  public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
2306    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
2307        getRpcControllerFactory()) {
2308      @Override
2309      protected HTableDescriptor[] rpcCall() throws Exception {
2310        List<TableSchema> list =
2311            master.listTableDescriptorsByNamespace(getRpcController(),
2312                ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
2313                .build()).getTableSchemaList();
2314        HTableDescriptor[] res = new HTableDescriptor[list.size()];
2315        for(int i=0; i < list.size(); i++) {
2316          res[i] = new ImmutableHTableDescriptor(ProtobufUtil.toTableDescriptor(list.get(i)));
2317        }
2318        return res;
2319      }
2320    });
2321  }
2322
2323  @Override
2324  public TableName[] listTableNamesByNamespace(final String name) throws IOException {
2325    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
2326        getRpcControllerFactory()) {
2327      @Override
2328      protected TableName[] rpcCall() throws Exception {
2329        List<HBaseProtos.TableName> tableNames =
2330            master.listTableNamesByNamespace(getRpcController(), ListTableNamesByNamespaceRequest.
2331                newBuilder().setNamespaceName(name).build())
2332            .getTableNameList();
2333        TableName[] result = new TableName[tableNames.size()];
2334        for (int i = 0; i < tableNames.size(); i++) {
2335          result[i] = ProtobufUtil.toTableName(tableNames.get(i));
2336        }
2337        return result;
2338      }
2339    });
2340  }
2341
2342  /**
2343   * Is HBase available? Throw an exception if not.
2344   * @param conf system configuration
2345   * @throws MasterNotRunningException if the master is not running.
2346   * @throws ZooKeeperConnectionException if unable to connect to zookeeper. // TODO do not expose
2347   *           ZKConnectionException.
2348   */
2349  public static void available(final Configuration conf)
2350      throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
2351    Configuration copyOfConf = HBaseConfiguration.create(conf);
2352    // We set it to make it fail as soon as possible if HBase is not available
2353    copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
2354    copyOfConf.setInt("zookeeper.recovery.retry", 0);
2355
2356    // Check ZK first.
2357    // If the connection exists, we may have a connection to ZK that does not work anymore
2358    try (ClusterConnection connection =
2359        (ClusterConnection) ConnectionFactory.createConnection(copyOfConf)) {
2360      // can throw MasterNotRunningException
2361      connection.isMasterRunning();
2362    }
2363  }
2364
2365  /**
2366   *
2367   * @param tableName
2368   * @return List of {@link HRegionInfo}.
2369   * @throws IOException
2370   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
2371   *             Use {@link #getRegions(TableName)}.
2372   */
2373  @Deprecated
2374  @Override
2375  public List<HRegionInfo> getTableRegions(final TableName tableName)
2376    throws IOException {
2377    return getRegions(tableName).stream()
2378        .map(ImmutableHRegionInfo::new)
2379        .collect(Collectors.toList());
2380  }
2381
2382  @Override
2383  public synchronized void close() throws IOException {
2384  }
2385
2386  @Override
2387  public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames)
2388  throws IOException {
2389    return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
2390        getRpcControllerFactory()) {
2391      @Override
2392      protected HTableDescriptor[] rpcCall() throws Exception {
2393        GetTableDescriptorsRequest req =
2394            RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2395        return ProtobufUtil
2396            .toTableDescriptorList(master.getTableDescriptors(getRpcController(), req)).stream()
2397            .map(ImmutableHTableDescriptor::new).toArray(HTableDescriptor[]::new);
2398      }
2399    });
2400  }
2401
2402  @Override
2403  public HTableDescriptor[] getTableDescriptors(List<String> names)
2404  throws IOException {
2405    List<TableName> tableNames = new ArrayList<>(names.size());
2406    for(String name : names) {
2407      tableNames.add(TableName.valueOf(name));
2408    }
2409    return getTableDescriptorsByTableName(tableNames);
2410  }
2411
2412  private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
2413      FailedLogCloseException {
2414    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2415    RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
2416    // TODO: There is no timeout on this controller. Set one!
2417    HBaseRpcController controller = rpcControllerFactory.newController();
2418    try {
2419      return admin.rollWALWriter(controller, request);
2420    } catch (ServiceException e) {
2421      throw ProtobufUtil.handleRemoteException(e);
2422    }
2423  }
2424
2425  /**
2426   * Roll the log writer. I.e. when using a file system based write ahead log,
2427   * start writing log messages to a new file.
2428   *
2429   * Note that when talking to a version 1.0+ HBase deployment, the rolling is asynchronous.
2430   * This method will return as soon as the roll is requested and the return value will
2431   * always be null. Additionally, the named region server may schedule store flushes at the
2432   * request of the wal handling the roll request.
2433   *
2434   * When talking to a 0.98 or older HBase deployment, the rolling is synchronous and the
2435   * return value may be either null or a list of encoded region names.
2436   *
2437   * @param serverName
2438   *          The servername of the regionserver. A server name is made of host,
2439   *          port and startcode. This is mandatory. Here is an example:
2440   *          <code> host187.example.com,60020,1289493121758</code>
2441   * @return a set of {@link HRegionInfo#getEncodedName()} that would allow the wal to
2442   *         clean up some underlying files. null if there's nothing to flush.
2443   * @throws IOException if a remote or network exception occurs
2444   * @throws FailedLogCloseException
2445   * @deprecated use {@link #rollWALWriter(ServerName)}
2446   */
2447  @Deprecated
2448  public synchronized byte[][] rollHLogWriter(String serverName)
2449      throws IOException, FailedLogCloseException {
2450    ServerName sn = ServerName.valueOf(serverName);
2451    final RollWALWriterResponse response = rollWALWriterImpl(sn);
2452    int regionCount = response.getRegionToFlushCount();
2453    if (0 == regionCount) {
2454      return null;
2455    }
2456    byte[][] regionsToFlush = new byte[regionCount][];
2457    for (int i = 0; i < regionCount; i++) {
2458      regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i));
2459    }
2460    return regionsToFlush;
2461  }
2462
2463  @Override
2464  public synchronized void rollWALWriter(ServerName serverName)
2465      throws IOException, FailedLogCloseException {
2466    rollWALWriterImpl(serverName);
2467  }
2468
2469  @Override
2470  public CompactionState getCompactionState(final TableName tableName)
2471  throws IOException {
2472    return getCompactionState(tableName, CompactType.NORMAL);
2473  }
2474
2475  @Override
2476  public CompactionState getCompactionStateForRegion(final byte[] regionName)
2477  throws IOException {
2478    final Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
2479    if (regionServerPair == null) {
2480      throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
2481    }
2482    if (regionServerPair.getSecond() == null) {
2483      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
2484    }
2485    ServerName sn = regionServerPair.getSecond();
2486    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
2487    // TODO: There is no timeout on this controller. Set one!
2488    HBaseRpcController controller = rpcControllerFactory.newController();
2489    GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
2490      regionServerPair.getFirst().getRegionName(), true);
2491    GetRegionInfoResponse response;
2492    try {
2493      response = admin.getRegionInfo(controller, request);
2494    } catch (ServiceException e) {
2495      throw ProtobufUtil.handleRemoteException(e);
2496    }
2497    if (response.getCompactionState() != null) {
2498      return ProtobufUtil.createCompactionState(response.getCompactionState());
2499    }
2500    return null;
2501  }
2502
2503  @Override
2504  public void snapshot(final String snapshotName,
2505                       final TableName tableName) throws IOException,
2506      SnapshotCreationException, IllegalArgumentException {
2507    snapshot(snapshotName, tableName, SnapshotType.FLUSH);
2508  }
2509
2510  @Override
2511  public void snapshot(final byte[] snapshotName, final TableName tableName)
2512      throws IOException, SnapshotCreationException, IllegalArgumentException {
2513    snapshot(Bytes.toString(snapshotName), tableName, SnapshotType.FLUSH);
2514  }
2515
2516  @Override
2517  public void snapshot(final String snapshotName, final TableName tableName,
2518      SnapshotType type)
2519      throws IOException, SnapshotCreationException, IllegalArgumentException {
2520    snapshot(new SnapshotDescription(snapshotName, tableName, type));
2521  }
2522
2523  @Override
2524  public void snapshot(SnapshotDescription snapshotDesc)
2525      throws IOException, SnapshotCreationException, IllegalArgumentException {
2526    // actually take the snapshot
2527    SnapshotProtos.SnapshotDescription snapshot =
2528      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2529    SnapshotResponse response = asyncSnapshot(snapshot);
2530    final IsSnapshotDoneRequest request =
2531        IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build();
2532    IsSnapshotDoneResponse done = null;
2533    long start = EnvironmentEdgeManager.currentTime();
2534    long max = response.getExpectedTimeout();
2535    long maxPauseTime = max / this.numRetries;
2536    int tries = 0;
2537    LOG.debug("Waiting a max of " + max + " ms for snapshot '" +
2538        ClientSnapshotDescriptionUtils.toString(snapshot) + "'' to complete. (max " +
2539        maxPauseTime + " ms per retry)");
2540    while (tries == 0
2541        || ((EnvironmentEdgeManager.currentTime() - start) < max && !done.getDone())) {
2542      try {
2543        // sleep a backoff <= pauseTime amount
2544        long sleep = getPauseTime(tries++);
2545        sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2546        LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2547          "ms while waiting for snapshot completion.");
2548        Thread.sleep(sleep);
2549      } catch (InterruptedException e) {
2550        throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2551      }
2552      LOG.debug("Getting current status of snapshot from master...");
2553      done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
2554          getRpcControllerFactory()) {
2555        @Override
2556        protected IsSnapshotDoneResponse rpcCall() throws Exception {
2557          return master.isSnapshotDone(getRpcController(), request);
2558        }
2559      });
2560    }
2561    if (!done.getDone()) {
2562      throw new SnapshotCreationException("Snapshot '" + snapshot.getName()
2563          + "' wasn't completed in expectedTime:" + max + " ms", snapshotDesc);
2564    }
2565  }
2566
2567  @Override
2568  public void snapshotAsync(SnapshotDescription snapshotDesc) throws IOException,
2569      SnapshotCreationException {
2570    asyncSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc));
2571  }
2572
2573  private SnapshotResponse asyncSnapshot(SnapshotProtos.SnapshotDescription snapshot)
2574      throws IOException {
2575    ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2576    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2577        .build();
2578    // run the snapshot on the master
2579    return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(),
2580        getRpcControllerFactory()) {
2581      @Override
2582      protected SnapshotResponse rpcCall() throws Exception {
2583        return master.snapshot(getRpcController(), request);
2584      }
2585    });
2586  }
2587
2588  @Override
2589  public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
2590      throws IOException, HBaseSnapshotException, UnknownSnapshotException {
2591    final SnapshotProtos.SnapshotDescription snapshot =
2592        ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2593    return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
2594        getRpcControllerFactory()) {
2595      @Override
2596      protected IsSnapshotDoneResponse rpcCall() throws Exception {
2597        return master.isSnapshotDone(getRpcController(),
2598          IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
2599      }
2600    }).getDone();
2601  }
2602
2603  @Override
2604  public void restoreSnapshot(final byte[] snapshotName)
2605      throws IOException, RestoreSnapshotException {
2606    restoreSnapshot(Bytes.toString(snapshotName));
2607  }
2608
2609  @Override
2610  public void restoreSnapshot(final String snapshotName)
2611      throws IOException, RestoreSnapshotException {
2612    boolean takeFailSafeSnapshot =
2613        conf.getBoolean(HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2614          HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2615    restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2616  }
2617
2618  @Override
2619  public void restoreSnapshot(final byte[] snapshotName, final boolean takeFailSafeSnapshot)
2620      throws IOException, RestoreSnapshotException {
2621    restoreSnapshot(Bytes.toString(snapshotName), takeFailSafeSnapshot);
2622  }
2623
2624  /*
2625   * Check whether the snapshot exists and contains disabled table
2626   *
2627   * @param snapshotName name of the snapshot to restore
2628   * @throws IOException if a remote or network exception occurs
2629   * @throws RestoreSnapshotException if no valid snapshot is found
2630   */
2631  private TableName getTableNameBeforeRestoreSnapshot(final String snapshotName)
2632      throws IOException, RestoreSnapshotException {
2633    TableName tableName = null;
2634    for (SnapshotDescription snapshotInfo: listSnapshots()) {
2635      if (snapshotInfo.getName().equals(snapshotName)) {
2636        tableName = snapshotInfo.getTableName();
2637        break;
2638      }
2639    }
2640
2641    if (tableName == null) {
2642      throw new RestoreSnapshotException(
2643        "Unable to find the table name for snapshot=" + snapshotName);
2644    }
2645    return tableName;
2646  }
2647
2648  @Override
2649  public void restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot)
2650      throws IOException, RestoreSnapshotException {
2651    restoreSnapshot(snapshotName, takeFailSafeSnapshot, false);
2652  }
2653
2654  @Override
2655  public void restoreSnapshot(final String snapshotName, final boolean takeFailSafeSnapshot,
2656      final boolean restoreAcl) throws IOException, RestoreSnapshotException {
2657    TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2658
2659    // The table does not exists, switch to clone.
2660    if (!tableExists(tableName)) {
2661      cloneSnapshot(snapshotName, tableName, restoreAcl);
2662      return;
2663    }
2664
2665    // Check if the table is disabled
2666    if (!isTableDisabled(tableName)) {
2667      throw new TableNotDisabledException(tableName);
2668    }
2669
2670    // Take a snapshot of the current state
2671    String failSafeSnapshotSnapshotName = null;
2672    if (takeFailSafeSnapshot) {
2673      failSafeSnapshotSnapshotName = conf.get("hbase.snapshot.restore.failsafe.name",
2674        "hbase-failsafe-{snapshot.name}-{restore.timestamp}");
2675      failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotName
2676        .replace("{snapshot.name}", snapshotName)
2677        .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2678        .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2679      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2680      snapshot(failSafeSnapshotSnapshotName, tableName);
2681    }
2682
2683    try {
2684      // Restore snapshot
2685      get(
2686        internalRestoreSnapshotAsync(snapshotName, tableName, restoreAcl),
2687        syncWaitTimeout,
2688        TimeUnit.MILLISECONDS);
2689    } catch (IOException e) {
2690      // Something went wrong during the restore...
2691      // if the pre-restore snapshot is available try to rollback
2692      if (takeFailSafeSnapshot) {
2693        try {
2694          get(
2695            internalRestoreSnapshotAsync(failSafeSnapshotSnapshotName, tableName, restoreAcl),
2696            syncWaitTimeout,
2697            TimeUnit.MILLISECONDS);
2698          String msg = "Restore snapshot=" + snapshotName +
2699            " failed. Rollback to snapshot=" + failSafeSnapshotSnapshotName + " succeeded.";
2700          LOG.error(msg, e);
2701          throw new RestoreSnapshotException(msg, e);
2702        } catch (IOException ex) {
2703          String msg = "Failed to restore and rollback to snapshot=" + failSafeSnapshotSnapshotName;
2704          LOG.error(msg, ex);
2705          throw new RestoreSnapshotException(msg, e);
2706        }
2707      } else {
2708        throw new RestoreSnapshotException("Failed to restore snapshot=" + snapshotName, e);
2709      }
2710    }
2711
2712    // If the restore is succeeded, delete the pre-restore snapshot
2713    if (takeFailSafeSnapshot) {
2714      try {
2715        LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2716        deleteSnapshot(failSafeSnapshotSnapshotName);
2717      } catch (IOException e) {
2718        LOG.error("Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, e);
2719      }
2720    }
2721  }
2722
2723  @Override
2724  public Future<Void> restoreSnapshotAsync(final String snapshotName)
2725      throws IOException, RestoreSnapshotException {
2726    TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
2727
2728    // The table does not exists, switch to clone.
2729    if (!tableExists(tableName)) {
2730      return cloneSnapshotAsync(snapshotName, tableName);
2731    }
2732
2733    // Check if the table is disabled
2734    if (!isTableDisabled(tableName)) {
2735      throw new TableNotDisabledException(tableName);
2736    }
2737
2738    return internalRestoreSnapshotAsync(snapshotName, tableName, false);
2739  }
2740
2741  @Override
2742  public void cloneSnapshot(final byte[] snapshotName, final TableName tableName)
2743      throws IOException, TableExistsException, RestoreSnapshotException {
2744    cloneSnapshot(Bytes.toString(snapshotName), tableName);
2745  }
2746
2747  @Override
2748  public void cloneSnapshot(String snapshotName, TableName tableName, boolean restoreAcl)
2749      throws IOException, TableExistsException, RestoreSnapshotException {
2750    if (tableExists(tableName)) {
2751      throw new TableExistsException(tableName);
2752    }
2753    get(
2754      internalRestoreSnapshotAsync(snapshotName, tableName, restoreAcl),
2755      Integer.MAX_VALUE,
2756      TimeUnit.MILLISECONDS);
2757  }
2758
2759  @Override
2760  public void cloneSnapshot(final String snapshotName, final TableName tableName)
2761      throws IOException, TableExistsException, RestoreSnapshotException {
2762    cloneSnapshot(snapshotName, tableName, false);
2763  }
2764
2765  @Override
2766  public Future<Void> cloneSnapshotAsync(final String snapshotName, final TableName tableName)
2767      throws IOException, TableExistsException {
2768    if (tableExists(tableName)) {
2769      throw new TableExistsException(tableName);
2770    }
2771    return internalRestoreSnapshotAsync(snapshotName, tableName, false);
2772  }
2773
2774  @Override
2775  public byte[] execProcedureWithReturn(String signature, String instance, Map<String,
2776      String> props) throws IOException {
2777    ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
2778    final ExecProcedureRequest request =
2779        ExecProcedureRequest.newBuilder().setProcedure(desc).build();
2780    // run the procedure on the master
2781    ExecProcedureResponse response = executeCallable(
2782      new MasterCallable<ExecProcedureResponse>(getConnection(), getRpcControllerFactory()) {
2783        @Override
2784        protected ExecProcedureResponse rpcCall() throws Exception {
2785          return master.execProcedureWithRet(getRpcController(), request);
2786        }
2787      });
2788
2789    return response.hasReturnData() ? response.getReturnData().toByteArray() : null;
2790  }
2791
2792  @Override
2793  public void execProcedure(String signature, String instance, Map<String, String> props)
2794      throws IOException {
2795    ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
2796    final ExecProcedureRequest request =
2797        ExecProcedureRequest.newBuilder().setProcedure(desc).build();
2798    // run the procedure on the master
2799    ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
2800        getConnection(), getRpcControllerFactory()) {
2801      @Override
2802      protected ExecProcedureResponse rpcCall() throws Exception {
2803        return master.execProcedure(getRpcController(), request);
2804      }
2805    });
2806
2807    long start = EnvironmentEdgeManager.currentTime();
2808    long max = response.getExpectedTimeout();
2809    long maxPauseTime = max / this.numRetries;
2810    int tries = 0;
2811    LOG.debug("Waiting a max of " + max + " ms for procedure '" +
2812        signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
2813    boolean done = false;
2814    while (tries == 0
2815        || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
2816      try {
2817        // sleep a backoff <= pauseTime amount
2818        long sleep = getPauseTime(tries++);
2819        sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
2820        LOG.debug("(#" + tries + ") Sleeping: " + sleep +
2821          "ms while waiting for procedure completion.");
2822        Thread.sleep(sleep);
2823      } catch (InterruptedException e) {
2824        throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
2825      }
2826      LOG.debug("Getting current status of procedure from master...");
2827      done = isProcedureFinished(signature, instance, props);
2828    }
2829    if (!done) {
2830      throw new IOException("Procedure '" + signature + " : " + instance
2831          + "' wasn't completed in expectedTime:" + max + " ms");
2832    }
2833  }
2834
2835  @Override
2836  public boolean isProcedureFinished(String signature, String instance, Map<String, String> props)
2837      throws IOException {
2838    ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props);
2839    return executeCallable(
2840      new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
2841        @Override
2842        protected IsProcedureDoneResponse rpcCall() throws Exception {
2843          return master.isProcedureDone(getRpcController(),
2844            IsProcedureDoneRequest.newBuilder().setProcedure(desc).build());
2845        }
2846      }).getDone();
2847  }
2848
2849  /**
2850   * Execute Restore/Clone snapshot and wait for the server to complete (blocking).
2851   * To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to
2852   * create an HTable instance to this table before it is available.
2853   * @param snapshotName snapshot to restore
2854   * @param tableName table name to restore the snapshot on
2855   * @throws IOException if a remote or network exception occurs
2856   * @throws RestoreSnapshotException if snapshot failed to be restored
2857   * @throws IllegalArgumentException if the restore request is formatted incorrectly
2858   */
2859  private Future<Void> internalRestoreSnapshotAsync(final String snapshotName,
2860      final TableName tableName, final boolean restoreAcl)
2861      throws IOException, RestoreSnapshotException {
2862    final SnapshotProtos.SnapshotDescription snapshot =
2863        SnapshotProtos.SnapshotDescription.newBuilder()
2864        .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2865
2866    // actually restore the snapshot
2867    ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2868
2869    RestoreSnapshotResponse response = executeCallable(
2870        new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) {
2871          Long nonceGroup = ng.getNonceGroup();
2872          Long nonce = ng.newNonce();
2873      @Override
2874      protected RestoreSnapshotResponse rpcCall() throws Exception {
2875        final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
2876            .setSnapshot(snapshot)
2877            .setNonceGroup(nonceGroup)
2878            .setNonce(nonce)
2879            .setRestoreACL(restoreAcl)
2880            .build();
2881        return master.restoreSnapshot(getRpcController(), request);
2882      }
2883    });
2884
2885    return new RestoreSnapshotFuture(this, snapshot, tableName, response);
2886  }
2887
2888  private static class RestoreSnapshotFuture extends TableFuture<Void> {
2889    public RestoreSnapshotFuture(
2890        final HBaseAdmin admin,
2891        final SnapshotProtos.SnapshotDescription snapshot,
2892        final TableName tableName,
2893        final RestoreSnapshotResponse response) {
2894      super(admin, tableName,
2895          (response != null && response.hasProcId()) ? response.getProcId() : null);
2896
2897      if (response != null && !response.hasProcId()) {
2898        throw new UnsupportedOperationException("Client could not call old version of Server");
2899      }
2900    }
2901
2902    public RestoreSnapshotFuture(
2903        final HBaseAdmin admin,
2904        final TableName tableName,
2905        final Long procId) {
2906      super(admin, tableName, procId);
2907    }
2908
2909    @Override
2910    public String getOperationType() {
2911      return "MODIFY";
2912    }
2913  }
2914
2915  @Override
2916  public List<SnapshotDescription> listSnapshots() throws IOException {
2917    return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(),
2918        getRpcControllerFactory()) {
2919      @Override
2920      protected List<SnapshotDescription> rpcCall() throws Exception {
2921        List<SnapshotProtos.SnapshotDescription> snapshotsList = master
2922            .getCompletedSnapshots(getRpcController(),
2923                GetCompletedSnapshotsRequest.newBuilder().build())
2924            .getSnapshotsList();
2925        List<SnapshotDescription> result = new ArrayList<>(snapshotsList.size());
2926        for (SnapshotProtos.SnapshotDescription snapshot : snapshotsList) {
2927          result.add(ProtobufUtil.createSnapshotDesc(snapshot));
2928        }
2929        return result;
2930      }
2931    });
2932  }
2933
2934  @Override
2935  public List<SnapshotDescription> listSnapshots(String regex) throws IOException {
2936    return listSnapshots(Pattern.compile(regex));
2937  }
2938
2939  @Override
2940  public List<SnapshotDescription> listSnapshots(Pattern pattern) throws IOException {
2941    List<SnapshotDescription> matched = new LinkedList<>();
2942    List<SnapshotDescription> snapshots = listSnapshots();
2943    for (SnapshotDescription snapshot : snapshots) {
2944      if (pattern.matcher(snapshot.getName()).matches()) {
2945        matched.add(snapshot);
2946      }
2947    }
2948    return matched;
2949  }
2950
2951  @Override
2952  public List<SnapshotDescription> listTableSnapshots(String tableNameRegex,
2953      String snapshotNameRegex) throws IOException {
2954    return listTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
2955  }
2956
2957  @Override
2958  public List<SnapshotDescription> listTableSnapshots(Pattern tableNamePattern,
2959      Pattern snapshotNamePattern) throws IOException {
2960    TableName[] tableNames = listTableNames(tableNamePattern);
2961
2962    List<SnapshotDescription> tableSnapshots = new LinkedList<>();
2963    List<SnapshotDescription> snapshots = listSnapshots(snapshotNamePattern);
2964
2965    List<TableName> listOfTableNames = Arrays.asList(tableNames);
2966    for (SnapshotDescription snapshot : snapshots) {
2967      if (listOfTableNames.contains(snapshot.getTableName())) {
2968        tableSnapshots.add(snapshot);
2969      }
2970    }
2971    return tableSnapshots;
2972  }
2973
2974  @Override
2975  public void deleteSnapshot(final byte[] snapshotName) throws IOException {
2976    deleteSnapshot(Bytes.toString(snapshotName));
2977  }
2978
2979  @Override
2980  public void deleteSnapshot(final String snapshotName) throws IOException {
2981    // make sure the snapshot is possibly valid
2982    TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
2983    // do the delete
2984    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
2985      @Override
2986      protected Void rpcCall() throws Exception {
2987        master.deleteSnapshot(getRpcController(),
2988          DeleteSnapshotRequest.newBuilder().setSnapshot(
2989                SnapshotProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
2990              .build()
2991        );
2992        return null;
2993      }
2994    });
2995  }
2996
2997  @Override
2998  public void deleteSnapshots(final String regex) throws IOException {
2999    deleteSnapshots(Pattern.compile(regex));
3000  }
3001
3002  @Override
3003  public void deleteSnapshots(final Pattern pattern) throws IOException {
3004    List<SnapshotDescription> snapshots = listSnapshots(pattern);
3005    for (final SnapshotDescription snapshot : snapshots) {
3006      try {
3007        internalDeleteSnapshot(snapshot);
3008      } catch (IOException ex) {
3009        LOG.info("Failed to delete snapshot " + snapshot.getName() + " for table "
3010                + snapshot.getTableNameAsString(), ex);
3011      }
3012    }
3013  }
3014
3015  private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
3016    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3017      @Override
3018      protected Void rpcCall() throws Exception {
3019        this.master.deleteSnapshot(getRpcController(), DeleteSnapshotRequest.newBuilder()
3020          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build());
3021        return null;
3022      }
3023    });
3024  }
3025
3026  @Override
3027  public void deleteTableSnapshots(String tableNameRegex, String snapshotNameRegex)
3028      throws IOException {
3029    deleteTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
3030  }
3031
3032  @Override
3033  public void deleteTableSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern)
3034      throws IOException {
3035    List<SnapshotDescription> snapshots = listTableSnapshots(tableNamePattern, snapshotNamePattern);
3036    for (SnapshotDescription snapshot : snapshots) {
3037      try {
3038        internalDeleteSnapshot(snapshot);
3039        LOG.debug("Successfully deleted snapshot: " + snapshot.getName());
3040      } catch (IOException e) {
3041        LOG.error("Failed to delete snapshot: " + snapshot.getName(), e);
3042      }
3043    }
3044  }
3045
3046  @Override
3047  public void setQuota(final QuotaSettings quota) throws IOException {
3048    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3049      @Override
3050      protected Void rpcCall() throws Exception {
3051        this.master.setQuota(getRpcController(), QuotaSettings.buildSetQuotaRequestProto(quota));
3052        return null;
3053      }
3054    });
3055  }
3056
3057  @Override
3058  public QuotaRetriever getQuotaRetriever(final QuotaFilter filter) throws IOException {
3059    return QuotaRetriever.open(conf, filter);
3060  }
3061
3062  @Override
3063  public List<QuotaSettings> getQuota(QuotaFilter filter) throws IOException {
3064    List<QuotaSettings> quotas = new LinkedList<>();
3065    try (QuotaRetriever retriever = QuotaRetriever.open(conf, filter)) {
3066      Iterator<QuotaSettings> iterator = retriever.iterator();
3067      while (iterator.hasNext()) {
3068        quotas.add(iterator.next());
3069      }
3070    }
3071    return quotas;
3072  }
3073
3074  private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable)
3075      throws IOException {
3076    return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout);
3077  }
3078
3079  static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
3080             RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
3081  throws IOException {
3082    RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
3083    try {
3084      return caller.callWithRetries(callable, operationTimeout);
3085    } finally {
3086      callable.close();
3087    }
3088  }
3089
3090  @Override
3091  // Coprocessor Endpoint against the Master.
3092  public CoprocessorRpcChannel coprocessorService() {
3093    return new SyncCoprocessorRpcChannel() {
3094      @Override
3095      protected Message callExecService(final RpcController controller,
3096          final Descriptors.MethodDescriptor method, final Message request,
3097          final Message responsePrototype)
3098      throws IOException {
3099        if (LOG.isTraceEnabled()) {
3100          LOG.trace("Call: " + method.getName() + ", " + request.toString());
3101        }
3102        // Try-with-resources so close gets called when we are done.
3103        try (MasterCallable<CoprocessorServiceResponse> callable =
3104            new MasterCallable<CoprocessorServiceResponse>(connection,
3105                connection.getRpcControllerFactory()) {
3106          @Override
3107          protected CoprocessorServiceResponse rpcCall() throws Exception {
3108            CoprocessorServiceRequest csr =
3109                CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
3110            return this.master.execMasterService(getRpcController(), csr);
3111          }
3112        }) {
3113          // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller
3114          callable.prepare(false);
3115          int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
3116          CoprocessorServiceResponse result = callable.call(operationTimeout);
3117          return CoprocessorRpcUtils.getResponse(result, responsePrototype);
3118        }
3119      }
3120    };
3121  }
3122
3123  /**
3124   * Simple {@link Abortable}, throwing RuntimeException on abort.
3125   */
3126  private static class ThrowableAbortable implements Abortable {
3127    @Override
3128    public void abort(String why, Throwable e) {
3129      throw new RuntimeException(why, e);
3130    }
3131
3132    @Override
3133    public boolean isAborted() {
3134      return true;
3135    }
3136  }
3137
3138  @Override
3139  public CoprocessorRpcChannel coprocessorService(final ServerName serverName) {
3140    return new SyncCoprocessorRpcChannel() {
3141      @Override
3142      protected Message callExecService(RpcController controller,
3143          Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
3144      throws IOException {
3145        if (LOG.isTraceEnabled()) {
3146          LOG.trace("Call: " + method.getName() + ", " + request.toString());
3147        }
3148        CoprocessorServiceRequest csr =
3149            CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
3150        // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller
3151        // TODO: Make this same as RegionCoprocessorRpcChannel and MasterCoprocessorRpcChannel. They
3152        // are all different though should do same thing; e.g. RpcChannel setup.
3153        ClientProtos.ClientService.BlockingInterface stub = connection.getClient(serverName);
3154        CoprocessorServiceResponse result;
3155        try {
3156          result = stub.
3157              execRegionServerService(connection.getRpcControllerFactory().newController(), csr);
3158          return CoprocessorRpcUtils.getResponse(result, responsePrototype);
3159        } catch (ServiceException e) {
3160          throw ProtobufUtil.handleRemoteException(e);
3161        }
3162      }
3163    };
3164  }
3165
3166  @Override
3167  public void updateConfiguration(final ServerName server) throws IOException {
3168    final AdminService.BlockingInterface admin = this.connection.getAdmin(server);
3169    Callable<Void> callable = new Callable<Void>() {
3170      @Override
3171      public Void call() throws Exception {
3172        admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance());
3173        return null;
3174      }
3175    };
3176    ProtobufUtil.call(callable);
3177  }
3178
3179  @Override
3180  public void updateConfiguration() throws IOException {
3181    ClusterMetrics status = getClusterMetrics(
3182      EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS));
3183    for (ServerName server : status.getLiveServerMetrics().keySet()) {
3184      updateConfiguration(server);
3185    }
3186
3187    updateConfiguration(status.getMasterName());
3188
3189    for (ServerName server : status.getBackupMasterNames()) {
3190      updateConfiguration(server);
3191    }
3192  }
3193
3194  @Override
3195  public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
3196    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
3197      @Override
3198      protected Long rpcCall() throws Exception {
3199        MajorCompactionTimestampRequest req =
3200            MajorCompactionTimestampRequest.newBuilder()
3201                .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3202        return master.getLastMajorCompactionTimestamp(getRpcController(), req).
3203            getCompactionTimestamp();
3204      }
3205    });
3206  }
3207
3208  @Override
3209  public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
3210    return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
3211      @Override
3212      protected Long rpcCall() throws Exception {
3213        MajorCompactionTimestampForRegionRequest req =
3214            MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter
3215                      .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
3216        return master.getLastMajorCompactionTimestampForRegion(getRpcController(), req)
3217            .getCompactionTimestamp();
3218      }
3219    });
3220  }
3221
3222  /**
3223   * {@inheritDoc}
3224   */
3225  @Override
3226  public void compact(final TableName tableName, final byte[] columnFamily, CompactType compactType)
3227    throws IOException, InterruptedException {
3228    compact(tableName, columnFamily, false, compactType);
3229  }
3230
3231  /**
3232   * {@inheritDoc}
3233   */
3234  @Override
3235  public void compact(final TableName tableName, CompactType compactType)
3236    throws IOException, InterruptedException {
3237    compact(tableName, null, false, compactType);
3238  }
3239
3240  /**
3241   * {@inheritDoc}
3242   */
3243  @Override
3244  public void majorCompact(final TableName tableName, final byte[] columnFamily,
3245    CompactType compactType) throws IOException, InterruptedException {
3246    compact(tableName, columnFamily, true, compactType);
3247  }
3248
3249  /**
3250   * {@inheritDoc}
3251   */
3252  @Override
3253  public void majorCompact(final TableName tableName, CompactType compactType)
3254          throws IOException, InterruptedException {
3255    compact(tableName, null, true, compactType);
3256  }
3257
3258  /**
3259   * {@inheritDoc}
3260   */
3261  @Override
3262  public CompactionState getCompactionState(final TableName tableName, CompactType compactType)
3263      throws IOException {
3264    AdminProtos.GetRegionInfoResponse.CompactionState state =
3265      AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
3266    checkTableExists(tableName);
3267    // TODO: There is no timeout on this controller. Set one!
3268    HBaseRpcController rpcController = rpcControllerFactory.newController();
3269    switch (compactType) {
3270      case MOB:
3271        final AdminProtos.AdminService.BlockingInterface masterAdmin =
3272          this.connection.getAdminForMaster();
3273        Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
3274          new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
3275            @Override
3276            public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
3277              RegionInfo info = RegionInfo.createMobRegionInfo(tableName);
3278              GetRegionInfoRequest request =
3279                RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true);
3280              GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
3281              return response.getCompactionState();
3282            }
3283          };
3284        state = ProtobufUtil.call(callable);
3285        break;
3286      case NORMAL:
3287        for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) {
3288          ServerName sn = loc.getServerName();
3289          if (sn == null) {
3290            continue;
3291          }
3292          byte[] regionName = loc.getRegion().getRegionName();
3293          AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
3294          try {
3295            Callable<GetRegionInfoResponse> regionInfoCallable =
3296              new Callable<GetRegionInfoResponse>() {
3297                @Override
3298                public GetRegionInfoResponse call() throws Exception {
3299                  GetRegionInfoRequest request =
3300                    RequestConverter.buildGetRegionInfoRequest(regionName, true);
3301                  return snAdmin.getRegionInfo(rpcController, request);
3302                }
3303              };
3304            GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
3305            switch (response.getCompactionState()) {
3306              case MAJOR_AND_MINOR:
3307                return CompactionState.MAJOR_AND_MINOR;
3308              case MAJOR:
3309                if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MINOR) {
3310                  return CompactionState.MAJOR_AND_MINOR;
3311                }
3312                state = AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR;
3313                break;
3314              case MINOR:
3315                if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) {
3316                  return CompactionState.MAJOR_AND_MINOR;
3317                }
3318                state = AdminProtos.GetRegionInfoResponse.CompactionState.MINOR;
3319                break;
3320              case NONE:
3321              default: // nothing, continue
3322            }
3323          } catch (NotServingRegionException e) {
3324            if (LOG.isDebugEnabled()) {
3325              LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " +
3326                StringUtils.stringifyException(e));
3327            }
3328          } catch (RemoteException e) {
3329            if (e.getMessage().indexOf(NotServingRegionException.class.getName()) >= 0) {
3330              if (LOG.isDebugEnabled()) {
3331                LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " +
3332                  StringUtils.stringifyException(e));
3333              }
3334            } else {
3335              throw e;
3336            }
3337          }
3338        }
3339        break;
3340      default:
3341        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3342    }
3343    if (state != null) {
3344      return ProtobufUtil.createCompactionState(state);
3345    }
3346    return null;
3347  }
3348
3349  /**
3350   * Future that waits on a procedure result.
3351   * Returned by the async version of the Admin calls,
3352   * and used internally by the sync calls to wait on the result of the procedure.
3353   */
3354  @InterfaceAudience.Private
3355  @InterfaceStability.Evolving
3356  protected static class ProcedureFuture<V> implements Future<V> {
3357    private ExecutionException exception = null;
3358    private boolean procResultFound = false;
3359    private boolean done = false;
3360    private boolean cancelled = false;
3361    private V result = null;
3362
3363    private final HBaseAdmin admin;
3364    protected final Long procId;
3365
3366    public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
3367      this.admin = admin;
3368      this.procId = procId;
3369    }
3370
3371    @Override
3372    public boolean cancel(boolean mayInterruptIfRunning) {
3373      AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder()
3374          .setProcId(procId).setMayInterruptIfRunning(mayInterruptIfRunning).build();
3375      try {
3376        cancelled = abortProcedureResult(abortProcRequest).getIsProcedureAborted();
3377        if (cancelled) {
3378          done = true;
3379        }
3380      } catch (IOException e) {
3381        // Cancell thrown exception for some reason. At this time, we are not sure whether
3382        // the cancell succeeds or fails. We assume that it is failed, but print out a warning
3383        // for debugging purpose.
3384        LOG.warn(
3385          "Cancelling the procedure with procId=" + procId + " throws exception " + e.getMessage(),
3386          e);
3387        cancelled = false;
3388      }
3389      return cancelled;
3390    }
3391
3392    @Override
3393    public boolean isCancelled() {
3394      return cancelled;
3395    }
3396
3397    protected AbortProcedureResponse abortProcedureResult(
3398        final AbortProcedureRequest request) throws IOException {
3399      return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
3400          admin.getConnection(), admin.getRpcControllerFactory()) {
3401        @Override
3402        protected AbortProcedureResponse rpcCall() throws Exception {
3403          return master.abortProcedure(getRpcController(), request);
3404        }
3405      });
3406    }
3407
3408    @Override
3409    public V get() throws InterruptedException, ExecutionException {
3410      // TODO: should we ever spin forever?
3411      // fix HBASE-21715. TODO: If the function call get() without timeout limit is not allowed,
3412      // is it possible to compose instead of inheriting from the class Future for this class?
3413      try {
3414        return get(admin.getProcedureTimeout, TimeUnit.MILLISECONDS);
3415      } catch (TimeoutException e) {
3416        LOG.warn("Failed to get the procedure with procId=" + procId + " throws exception " + e
3417            .getMessage(), e);
3418        return null;
3419      }
3420    }
3421
3422    @Override
3423    public V get(long timeout, TimeUnit unit)
3424        throws InterruptedException, ExecutionException, TimeoutException {
3425      if (!done) {
3426        long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
3427        try {
3428          try {
3429            // if the master support procedures, try to wait the result
3430            if (procId != null) {
3431              result = waitProcedureResult(procId, deadlineTs);
3432            }
3433            // if we don't have a proc result, try the compatibility wait
3434            if (!procResultFound) {
3435              result = waitOperationResult(deadlineTs);
3436            }
3437            result = postOperationResult(result, deadlineTs);
3438            done = true;
3439          } catch (IOException e) {
3440            result = postOperationFailure(e, deadlineTs);
3441            done = true;
3442          }
3443        } catch (IOException e) {
3444          exception = new ExecutionException(e);
3445          done = true;
3446        }
3447      }
3448      if (exception != null) {
3449        throw exception;
3450      }
3451      return result;
3452    }
3453
3454    @Override
3455    public boolean isDone() {
3456      return done;
3457    }
3458
3459    protected HBaseAdmin getAdmin() {
3460      return admin;
3461    }
3462
3463    private V waitProcedureResult(long procId, long deadlineTs)
3464        throws IOException, TimeoutException, InterruptedException {
3465      GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
3466          .setProcId(procId)
3467          .build();
3468
3469      int tries = 0;
3470      IOException serviceEx = null;
3471      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3472        GetProcedureResultResponse response = null;
3473        try {
3474          // Try to fetch the result
3475          response = getProcedureResult(request);
3476        } catch (IOException e) {
3477          serviceEx = unwrapException(e);
3478
3479          // the master may be down
3480          LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
3481
3482          // Not much to do, if we have a DoNotRetryIOException
3483          if (serviceEx instanceof DoNotRetryIOException) {
3484            // TODO: looks like there is no way to unwrap this exception and get the proper
3485            // UnsupportedOperationException aside from looking at the message.
3486            // anyway, if we fail here we just failover to the compatibility side
3487            // and that is always a valid solution.
3488            LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx);
3489            procResultFound = false;
3490            return null;
3491          }
3492        }
3493
3494        // If the procedure is no longer running, we should have a result
3495        if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) {
3496          procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
3497          return convertResult(response);
3498        }
3499
3500        try {
3501          Thread.sleep(getAdmin().getPauseTime(tries++));
3502        } catch (InterruptedException e) {
3503          throw new InterruptedException(
3504            "Interrupted while waiting for the result of proc " + procId);
3505        }
3506      }
3507      if (serviceEx != null) {
3508        throw serviceEx;
3509      } else {
3510        throw new TimeoutException("The procedure " + procId + " is still running");
3511      }
3512    }
3513
3514    private static IOException unwrapException(IOException e) {
3515      if (e instanceof RemoteException) {
3516        return ((RemoteException)e).unwrapRemoteException();
3517      }
3518      return e;
3519    }
3520
3521    protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
3522        throws IOException {
3523      return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
3524          admin.getConnection(), admin.getRpcControllerFactory()) {
3525        @Override
3526        protected GetProcedureResultResponse rpcCall() throws Exception {
3527          return master.getProcedureResult(getRpcController(), request);
3528        }
3529      });
3530    }
3531
3532    /**
3533     * Convert the procedure result response to a specified type.
3534     * @param response the procedure result object to parse
3535     * @return the result data of the procedure.
3536     */
3537    protected V convertResult(final GetProcedureResultResponse response) throws IOException {
3538      if (response.hasException()) {
3539        throw ForeignExceptionUtil.toIOException(response.getException());
3540      }
3541      return null;
3542    }
3543
3544    /**
3545     * Fallback implementation in case the procedure is not supported by the server.
3546     * It should try to wait until the operation is completed.
3547     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3548     * @return the result data of the operation
3549     */
3550    protected V waitOperationResult(final long deadlineTs)
3551        throws IOException, TimeoutException {
3552      return null;
3553    }
3554
3555    /**
3556     * Called after the operation is completed and the result fetched. this allows to perform extra
3557     * steps after the procedure is completed. it allows to apply transformations to the result that
3558     * will be returned by get().
3559     * @param result the result of the procedure
3560     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3561     * @return the result of the procedure, which may be the same as the passed one
3562     */
3563    protected V postOperationResult(final V result, final long deadlineTs)
3564        throws IOException, TimeoutException {
3565      return result;
3566    }
3567
3568    /**
3569     * Called after the operation is terminated with a failure.
3570     * this allows to perform extra steps after the procedure is terminated.
3571     * it allows to apply transformations to the result that will be returned by get().
3572     * The default implementation will rethrow the exception
3573     * @param exception the exception got from fetching the result
3574     * @param deadlineTs the timestamp after which this method should throw a TimeoutException
3575     * @return the result of the procedure, which may be the same as the passed one
3576     */
3577    protected V postOperationFailure(final IOException exception, final long deadlineTs)
3578        throws IOException, TimeoutException {
3579      throw exception;
3580    }
3581
3582    protected interface WaitForStateCallable {
3583      boolean checkState(int tries) throws IOException;
3584      void throwInterruptedException() throws InterruptedIOException;
3585      void throwTimeoutException(long elapsed) throws TimeoutException;
3586    }
3587
3588    protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
3589        throws IOException, TimeoutException {
3590      int tries = 0;
3591      IOException serverEx = null;
3592      long startTime = EnvironmentEdgeManager.currentTime();
3593      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3594        serverEx = null;
3595        try {
3596          if (callable.checkState(tries)) {
3597            return;
3598          }
3599        } catch (IOException e) {
3600          serverEx = e;
3601        }
3602        try {
3603          Thread.sleep(getAdmin().getPauseTime(tries++));
3604        } catch (InterruptedException e) {
3605          callable.throwInterruptedException();
3606        }
3607      }
3608      if (serverEx != null) {
3609        throw unwrapException(serverEx);
3610      } else {
3611        callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
3612      }
3613    }
3614  }
3615
3616  @InterfaceAudience.Private
3617  @InterfaceStability.Evolving
3618  protected static abstract class TableFuture<V> extends ProcedureFuture<V> {
3619    private final TableName tableName;
3620
3621    public TableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) {
3622      super(admin, procId);
3623      this.tableName = tableName;
3624    }
3625
3626    @Override
3627    public String toString() {
3628      return getDescription();
3629    }
3630
3631    /**
3632     * @return the table name
3633     */
3634    protected TableName getTableName() {
3635      return tableName;
3636    }
3637
3638    /**
3639     * @return the table descriptor
3640     */
3641    protected TableDescriptor getTableDescriptor() throws IOException {
3642      return getAdmin().getDescriptor(getTableName());
3643    }
3644
3645    /**
3646     * @return the operation type like CREATE, DELETE, DISABLE etc.
3647     */
3648    public abstract String getOperationType();
3649
3650    /**
3651     * @return a description of the operation
3652     */
3653    protected String getDescription() {
3654      return "Operation: " + getOperationType() + ", " + "Table Name: " +
3655        tableName.getNameWithNamespaceInclAsString() + ", procId: " + procId;
3656    }
3657
3658    protected abstract class TableWaitForStateCallable implements WaitForStateCallable {
3659      @Override
3660      public void throwInterruptedException() throws InterruptedIOException {
3661        throw new InterruptedIOException("Interrupted while waiting for " + getDescription());
3662      }
3663
3664      @Override
3665      public void throwTimeoutException(long elapsedTime) throws TimeoutException {
3666        throw new TimeoutException(
3667          getDescription() + " has not completed after " + elapsedTime + "ms");
3668      }
3669    }
3670
3671    @Override
3672    protected V postOperationResult(final V result, final long deadlineTs)
3673        throws IOException, TimeoutException {
3674      LOG.info(getDescription() + " completed");
3675      return super.postOperationResult(result, deadlineTs);
3676    }
3677
3678    @Override
3679    protected V postOperationFailure(final IOException exception, final long deadlineTs)
3680        throws IOException, TimeoutException {
3681      LOG.info(getDescription() + " failed with " + exception.getMessage());
3682      return super.postOperationFailure(exception, deadlineTs);
3683    }
3684
3685    protected void waitForTableEnabled(final long deadlineTs)
3686        throws IOException, TimeoutException {
3687      waitForState(deadlineTs, new TableWaitForStateCallable() {
3688        @Override
3689        public boolean checkState(int tries) throws IOException {
3690          try {
3691            if (getAdmin().isTableAvailable(tableName)) {
3692              return true;
3693            }
3694          } catch (TableNotFoundException tnfe) {
3695            LOG.debug("Table " + tableName.getNameWithNamespaceInclAsString()
3696                + " was not enabled, sleeping. tries=" + tries);
3697          }
3698          return false;
3699        }
3700      });
3701    }
3702
3703    protected void waitForTableDisabled(final long deadlineTs)
3704        throws IOException, TimeoutException {
3705      waitForState(deadlineTs, new TableWaitForStateCallable() {
3706        @Override
3707        public boolean checkState(int tries) throws IOException {
3708          return getAdmin().isTableDisabled(tableName);
3709        }
3710      });
3711    }
3712
3713    protected void waitTableNotFound(final long deadlineTs)
3714        throws IOException, TimeoutException {
3715      waitForState(deadlineTs, new TableWaitForStateCallable() {
3716        @Override
3717        public boolean checkState(int tries) throws IOException {
3718          return !getAdmin().tableExists(tableName);
3719        }
3720      });
3721    }
3722
3723    protected void waitForSchemaUpdate(final long deadlineTs)
3724        throws IOException, TimeoutException {
3725      waitForState(deadlineTs, new TableWaitForStateCallable() {
3726        @Override
3727        public boolean checkState(int tries) throws IOException {
3728          return getAdmin().getAlterStatus(tableName).getFirst() == 0;
3729        }
3730      });
3731    }
3732
3733    protected void waitForAllRegionsOnline(final long deadlineTs, final byte[][] splitKeys)
3734        throws IOException, TimeoutException {
3735      final TableDescriptor desc = getTableDescriptor();
3736      final AtomicInteger actualRegCount = new AtomicInteger(0);
3737      final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3738        @Override
3739        public boolean visit(Result rowResult) throws IOException {
3740          RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
3741          if (list == null) {
3742            LOG.warn("No serialized HRegionInfo in " + rowResult);
3743            return true;
3744          }
3745          HRegionLocation l = list.getRegionLocation();
3746          if (l == null) {
3747            return true;
3748          }
3749          if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
3750            return false;
3751          }
3752          if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
3753          HRegionLocation[] locations = list.getRegionLocations();
3754          for (HRegionLocation location : locations) {
3755            if (location == null) continue;
3756            ServerName serverName = location.getServerName();
3757            // Make sure that regions are assigned to server
3758            if (serverName != null && serverName.getHostAndPort() != null) {
3759              actualRegCount.incrementAndGet();
3760            }
3761          }
3762          return true;
3763        }
3764      };
3765
3766      int tries = 0;
3767      int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
3768      while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
3769        actualRegCount.set(0);
3770        MetaTableAccessor.scanMetaForTableRegions(getAdmin().getConnection(), visitor,
3771          desc.getTableName());
3772        if (actualRegCount.get() == numRegs) {
3773          // all the regions are online
3774          return;
3775        }
3776
3777        try {
3778          Thread.sleep(getAdmin().getPauseTime(tries++));
3779        } catch (InterruptedException e) {
3780          throw new InterruptedIOException("Interrupted when opening" + " regions; "
3781              + actualRegCount.get() + " of " + numRegs + " regions processed so far");
3782        }
3783      }
3784      throw new TimeoutException("Only " + actualRegCount.get() + " of " + numRegs
3785          + " regions are online; retries exhausted.");
3786    }
3787  }
3788
3789  @InterfaceAudience.Private
3790  @InterfaceStability.Evolving
3791  protected static abstract class NamespaceFuture extends ProcedureFuture<Void> {
3792    private final String namespaceName;
3793
3794    public NamespaceFuture(final HBaseAdmin admin, final String namespaceName, final Long procId) {
3795      super(admin, procId);
3796      this.namespaceName = namespaceName;
3797    }
3798
3799    /**
3800     * @return the namespace name
3801     */
3802    protected String getNamespaceName() {
3803      return namespaceName;
3804    }
3805
3806    /**
3807     * @return the operation type like CREATE_NAMESPACE, DELETE_NAMESPACE, etc.
3808     */
3809    public abstract String getOperationType();
3810
3811    @Override
3812    public String toString() {
3813      return "Operation: " + getOperationType() + ", Namespace: " + getNamespaceName();
3814    }
3815  }
3816
3817  @Override
3818  public List<SecurityCapability> getSecurityCapabilities() throws IOException {
3819    try {
3820      return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection(),
3821          getRpcControllerFactory()) {
3822        @Override
3823        protected List<SecurityCapability> rpcCall() throws Exception {
3824          SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build();
3825          return ProtobufUtil.toSecurityCapabilityList(
3826            master.getSecurityCapabilities(getRpcController(), req).getCapabilitiesList());
3827        }
3828      });
3829    } catch (IOException e) {
3830      if (e instanceof RemoteException) {
3831        e = ((RemoteException)e).unwrapRemoteException();
3832      }
3833      throw e;
3834    }
3835  }
3836
3837  @Override
3838  public boolean splitSwitch(boolean enabled, boolean synchronous) throws IOException {
3839    return splitOrMergeSwitch(enabled, synchronous, MasterSwitchType.SPLIT);
3840  }
3841
3842  @Override
3843  public boolean mergeSwitch(boolean enabled, boolean synchronous) throws IOException {
3844    return splitOrMergeSwitch(enabled, synchronous, MasterSwitchType.MERGE);
3845  }
3846
3847  private boolean splitOrMergeSwitch(boolean enabled, boolean synchronous,
3848      MasterSwitchType switchType) throws IOException {
3849    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3850      @Override
3851      protected Boolean rpcCall() throws Exception {
3852        MasterProtos.SetSplitOrMergeEnabledResponse response = master.setSplitOrMergeEnabled(
3853          getRpcController(),
3854          RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType));
3855        return response.getPrevValueList().get(0);
3856      }
3857    });
3858  }
3859
3860  @Override
3861  public boolean isSplitEnabled() throws IOException {
3862    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3863      @Override
3864      protected Boolean rpcCall() throws Exception {
3865        return master.isSplitOrMergeEnabled(getRpcController(),
3866          RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
3867      }
3868    });
3869  }
3870
3871  @Override
3872  public boolean isMergeEnabled() throws IOException {
3873    return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
3874      @Override
3875      protected Boolean rpcCall() throws Exception {
3876        return master.isSplitOrMergeEnabled(getRpcController(),
3877          RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
3878      }
3879    });
3880  }
3881
3882  private RpcControllerFactory getRpcControllerFactory() {
3883    return this.rpcControllerFactory;
3884  }
3885
3886  @Override
3887  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
3888      throws IOException {
3889    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3890      @Override
3891      protected Void rpcCall() throws Exception {
3892        master.addReplicationPeer(getRpcController(),
3893          RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
3894        return null;
3895      }
3896    });
3897  }
3898
3899  @Override
3900  public void removeReplicationPeer(String peerId) throws IOException {
3901    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3902      @Override
3903      protected Void rpcCall() throws Exception {
3904        master.removeReplicationPeer(getRpcController(),
3905          RequestConverter.buildRemoveReplicationPeerRequest(peerId));
3906        return null;
3907      }
3908    });
3909  }
3910
3911  @Override
3912  public void enableReplicationPeer(final String peerId) throws IOException {
3913    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3914      @Override
3915      protected Void rpcCall() throws Exception {
3916        master.enableReplicationPeer(getRpcController(),
3917          RequestConverter.buildEnableReplicationPeerRequest(peerId));
3918        return null;
3919      }
3920    });
3921  }
3922
3923  @Override
3924  public void disableReplicationPeer(final String peerId) throws IOException {
3925    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3926      @Override
3927      protected Void rpcCall() throws Exception {
3928        master.disableReplicationPeer(getRpcController(),
3929          RequestConverter.buildDisableReplicationPeerRequest(peerId));
3930        return null;
3931      }
3932    });
3933  }
3934
3935  @Override
3936  public ReplicationPeerConfig getReplicationPeerConfig(final String peerId) throws IOException {
3937    return executeCallable(new MasterCallable<ReplicationPeerConfig>(getConnection(),
3938        getRpcControllerFactory()) {
3939      @Override
3940      protected ReplicationPeerConfig rpcCall() throws Exception {
3941        GetReplicationPeerConfigResponse response = master.getReplicationPeerConfig(
3942          getRpcController(), RequestConverter.buildGetReplicationPeerConfigRequest(peerId));
3943        return ReplicationPeerConfigUtil.convert(response.getPeerConfig());
3944      }
3945    });
3946  }
3947
3948  @Override
3949  public void updateReplicationPeerConfig(final String peerId,
3950      final ReplicationPeerConfig peerConfig) throws IOException {
3951    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
3952      @Override
3953      protected Void rpcCall() throws Exception {
3954        master.updateReplicationPeerConfig(getRpcController(),
3955          RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig));
3956        return null;
3957      }
3958    });
3959  }
3960
3961  @Override
3962  public void appendReplicationPeerTableCFs(String id,
3963      Map<TableName, List<String>> tableCfs)
3964      throws ReplicationException, IOException {
3965    if (tableCfs == null) {
3966      throw new ReplicationException("tableCfs is null");
3967    }
3968    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
3969    ReplicationPeerConfig newPeerConfig =
3970        ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
3971    updateReplicationPeerConfig(id, newPeerConfig);
3972  }
3973
3974  @Override
3975  public void removeReplicationPeerTableCFs(String id,
3976      Map<TableName, List<String>> tableCfs)
3977      throws ReplicationException, IOException {
3978    if (tableCfs == null) {
3979      throw new ReplicationException("tableCfs is null");
3980    }
3981    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
3982    ReplicationPeerConfig newPeerConfig =
3983        ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
3984    updateReplicationPeerConfig(id, newPeerConfig);
3985  }
3986
3987  @Override
3988  public List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
3989    return listReplicationPeers((Pattern)null);
3990  }
3991
3992  @Override
3993  public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
3994      throws IOException {
3995    return executeCallable(new MasterCallable<List<ReplicationPeerDescription>>(getConnection(),
3996        getRpcControllerFactory()) {
3997      @Override
3998      protected List<ReplicationPeerDescription> rpcCall() throws Exception {
3999        List<ReplicationProtos.ReplicationPeerDescription> peersList = master.listReplicationPeers(
4000          getRpcController(), RequestConverter.buildListReplicationPeersRequest(pattern))
4001            .getPeerDescList();
4002        List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size());
4003        for (ReplicationProtos.ReplicationPeerDescription peer : peersList) {
4004          result.add(ReplicationPeerConfigUtil.toReplicationPeerDescription(peer));
4005        }
4006        return result;
4007      }
4008    });
4009  }
4010
4011  @Override
4012  public void decommissionRegionServers(List<ServerName> servers, boolean offload)
4013      throws IOException {
4014    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
4015      @Override
4016      public Void rpcCall() throws ServiceException {
4017        master.decommissionRegionServers(getRpcController(),
4018          RequestConverter.buildDecommissionRegionServersRequest(servers, offload));
4019        return null;
4020      }
4021    });
4022  }
4023
4024  @Override
4025  public List<ServerName> listDecommissionedRegionServers() throws IOException {
4026    return executeCallable(new MasterCallable<List<ServerName>>(getConnection(),
4027              getRpcControllerFactory()) {
4028      @Override
4029      public List<ServerName> rpcCall() throws ServiceException {
4030        ListDecommissionedRegionServersRequest req =
4031            ListDecommissionedRegionServersRequest.newBuilder().build();
4032        List<ServerName> servers = new ArrayList<>();
4033        for (HBaseProtos.ServerName server : master
4034            .listDecommissionedRegionServers(getRpcController(), req).getServerNameList()) {
4035          servers.add(ProtobufUtil.toServerName(server));
4036        }
4037        return servers;
4038      }
4039    });
4040  }
4041
4042  @Override
4043  public void recommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames)
4044      throws IOException {
4045    executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
4046      @Override
4047      public Void rpcCall() throws ServiceException {
4048        master.recommissionRegionServer(getRpcController(),
4049          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames));
4050        return null;
4051      }
4052    });
4053  }
4054
4055  @Override
4056  public List<TableCFs> listReplicatedTableCFs() throws IOException {
4057    List<TableCFs> replicatedTableCFs = new ArrayList<>();
4058    List<TableDescriptor> tables = listTableDescriptors();
4059    tables.forEach(table -> {
4060      Map<String, Integer> cfs = new HashMap<>();
4061      Stream.of(table.getColumnFamilies())
4062          .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
4063          .forEach(column -> {
4064            cfs.put(column.getNameAsString(), column.getScope());
4065          });
4066      if (!cfs.isEmpty()) {
4067        replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
4068      }
4069    });
4070    return replicatedTableCFs;
4071  }
4072
4073  @Override
4074  public void enableTableReplication(final TableName tableName) throws IOException {
4075    if (tableName == null) {
4076      throw new IllegalArgumentException("Table name cannot be null");
4077    }
4078    if (!tableExists(tableName)) {
4079      throw new TableNotFoundException("Table '" + tableName.getNameAsString()
4080          + "' does not exists.");
4081    }
4082    byte[][] splits = getTableSplits(tableName);
4083    checkAndSyncTableDescToPeers(tableName, splits);
4084    setTableRep(tableName, true);
4085  }
4086
4087  @Override
4088  public void disableTableReplication(final TableName tableName) throws IOException {
4089    if (tableName == null) {
4090      throw new IllegalArgumentException("Table name is null");
4091    }
4092    if (!tableExists(tableName)) {
4093      throw new TableNotFoundException("Table '" + tableName.getNameAsString()
4094          + "' does not exists.");
4095    }
4096    setTableRep(tableName, false);
4097  }
4098
4099  /**
4100   * Connect to peer and check the table descriptor on peer:
4101   * <ol>
4102   * <li>Create the same table on peer when not exist.</li>
4103   * <li>Throw an exception if the table already has replication enabled on any of the column
4104   * families.</li>
4105   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
4106   * </ol>
4107   * @param tableName name of the table to sync to the peer
4108   * @param splits table split keys
4109   * @throws IOException
4110   */
4111  private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
4112      throws IOException {
4113    List<ReplicationPeerDescription> peers = listReplicationPeers();
4114    if (peers == null || peers.size() <= 0) {
4115      throw new IllegalArgumentException("Found no peer cluster for replication.");
4116    }
4117
4118    for (ReplicationPeerDescription peerDesc : peers) {
4119      if (peerDesc.getPeerConfig().needToReplicate(tableName)) {
4120        Configuration peerConf =
4121            ReplicationPeerConfigUtil.getPeerClusterConfiguration(this.conf, peerDesc);
4122        try (Connection conn = ConnectionFactory.createConnection(peerConf);
4123            Admin repHBaseAdmin = conn.getAdmin()) {
4124          TableDescriptor tableDesc = getDescriptor(tableName);
4125          TableDescriptor peerTableDesc = null;
4126          if (!repHBaseAdmin.tableExists(tableName)) {
4127            repHBaseAdmin.createTable(tableDesc, splits);
4128          } else {
4129            peerTableDesc = repHBaseAdmin.getDescriptor(tableName);
4130            if (peerTableDesc == null) {
4131              throw new IllegalArgumentException("Failed to get table descriptor for table "
4132                  + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId());
4133            }
4134            if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc,
4135              tableDesc) != 0) {
4136              throw new IllegalArgumentException("Table " + tableName.getNameAsString()
4137                  + " exists in peer cluster " + peerDesc.getPeerId()
4138                  + ", but the table descriptors are not same when compared with source cluster."
4139                  + " Thus can not enable the table's replication switch.");
4140            }
4141          }
4142        }
4143      }
4144    }
4145  }
4146
4147  /**
4148   * Set the table's replication switch if the table's replication switch is already not set.
4149   * @param tableName name of the table
4150   * @param enableRep is replication switch enable or disable
4151   * @throws IOException if a remote or network exception occurs
4152   */
4153  private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
4154    TableDescriptor tableDesc = getDescriptor(tableName);
4155    if (!tableDesc.matchReplicationScope(enableRep)) {
4156      int scope =
4157          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
4158      modifyTable(TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build());
4159    }
4160  }
4161
4162  @Override
4163  public void clearCompactionQueues(final ServerName sn, final Set<String> queues)
4164    throws IOException, InterruptedException {
4165    if (queues == null || queues.size() == 0) {
4166      throw new IllegalArgumentException("queues cannot be null or empty");
4167    }
4168    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
4169    Callable<Void> callable = new Callable<Void>() {
4170      @Override
4171      public Void call() throws Exception {
4172        // TODO: There is no timeout on this controller. Set one!
4173        HBaseRpcController controller = rpcControllerFactory.newController();
4174        ClearCompactionQueuesRequest request =
4175                RequestConverter.buildClearCompactionQueuesRequest(queues);
4176        admin.clearCompactionQueues(controller, request);
4177        return null;
4178      }
4179    };
4180    ProtobufUtil.call(callable);
4181  }
4182
4183  @Override
4184  public List<ServerName> clearDeadServers(List<ServerName> servers) throws IOException {
4185    return executeCallable(new MasterCallable<List<ServerName>>(getConnection(),
4186            getRpcControllerFactory()) {
4187      @Override
4188      protected List<ServerName> rpcCall() throws Exception {
4189        ClearDeadServersRequest req = RequestConverter.
4190          buildClearDeadServersRequest(servers == null? Collections.EMPTY_LIST: servers);
4191        return ProtobufUtil.toServerNameList(
4192                master.clearDeadServers(getRpcController(), req).getServerNameList());
4193      }
4194    });
4195  }
4196}