001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
023
024import com.google.protobuf.Message;
025import com.google.protobuf.RpcChannel;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentLinkedQueue;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicReference;
040import java.util.function.BiConsumer;
041import java.util.function.Function;
042import java.util.function.Supplier;
043import java.util.regex.Pattern;
044import java.util.stream.Collectors;
045import java.util.stream.Stream;
046import org.apache.commons.io.IOUtils;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
049import org.apache.hadoop.hbase.CacheEvictionStats;
050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
051import org.apache.hadoop.hbase.ClusterMetrics;
052import org.apache.hadoop.hbase.ClusterMetrics.Option;
053import org.apache.hadoop.hbase.ClusterMetricsBuilder;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.HRegionLocation;
056import org.apache.hadoop.hbase.MetaTableAccessor;
057import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
058import org.apache.hadoop.hbase.NamespaceDescriptor;
059import org.apache.hadoop.hbase.RegionLocations;
060import org.apache.hadoop.hbase.RegionMetrics;
061import org.apache.hadoop.hbase.RegionMetricsBuilder;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableExistsException;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.TableNotDisabledException;
066import org.apache.hadoop.hbase.TableNotEnabledException;
067import org.apache.hadoop.hbase.TableNotFoundException;
068import org.apache.hadoop.hbase.UnknownRegionException;
069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.Scan.ReadType;
073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
074import org.apache.hadoop.hbase.client.replication.TableCFs;
075import org.apache.hadoop.hbase.client.security.SecurityCapability;
076import org.apache.hadoop.hbase.exceptions.DeserializationException;
077import org.apache.hadoop.hbase.ipc.HBaseRpcController;
078import org.apache.hadoop.hbase.quotas.QuotaFilter;
079import org.apache.hadoop.hbase.quotas.QuotaSettings;
080import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
081import org.apache.hadoop.hbase.replication.ReplicationException;
082import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
083import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
084import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
085import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
086import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
087import org.apache.hadoop.hbase.util.Bytes;
088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
089import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
095import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
096import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
097import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
098import org.apache.hbase.thirdparty.io.netty.util.Timeout;
099import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
100
101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
102import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
264
265/**
266 * The implementation of AsyncAdmin.
267 * <p>
268 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
269 * be finished inside the rpc framework thread, which means that the callbacks registered to the
270 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
271 * this class should not try to do time consuming tasks in the callbacks.
272 * @since 2.0.0
273 * @see AsyncHBaseAdmin
274 * @see AsyncConnection#getAdmin()
275 * @see AsyncConnection#getAdminBuilder()
276 */
277@InterfaceAudience.Private
278class RawAsyncHBaseAdmin implements AsyncAdmin {
279  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
280
281  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
282
283  private final AsyncConnectionImpl connection;
284
285  private final HashedWheelTimer retryTimer;
286
287  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
288
289  private final long rpcTimeoutNs;
290
291  private final long operationTimeoutNs;
292
293  private final long pauseNs;
294
295  private final int maxAttempts;
296
297  private final int startLogErrorsCnt;
298
299  private final NonceGenerator ng;
300
301  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
302      AsyncAdminBuilderBase builder) {
303    this.connection = connection;
304    this.retryTimer = retryTimer;
305    this.metaTable = connection.getTable(META_TABLE_NAME);
306    this.rpcTimeoutNs = builder.rpcTimeoutNs;
307    this.operationTimeoutNs = builder.operationTimeoutNs;
308    this.pauseNs = builder.pauseNs;
309    this.maxAttempts = builder.maxAttempts;
310    this.startLogErrorsCnt = builder.startLogErrorsCnt;
311    this.ng = connection.getNonceGenerator();
312  }
313
314  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
315    return this.connection.callerFactory.<T> masterRequest()
316        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
317        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
318        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
319        .startLogErrorsCnt(startLogErrorsCnt);
320  }
321
322  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
323    return this.connection.callerFactory.<T> adminRequest()
324        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
325        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
326        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
327        .startLogErrorsCnt(startLogErrorsCnt);
328  }
329
330  @FunctionalInterface
331  private interface MasterRpcCall<RESP, REQ> {
332    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
333        RpcCallback<RESP> done);
334  }
335
336  @FunctionalInterface
337  private interface AdminRpcCall<RESP, REQ> {
338    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
339        RpcCallback<RESP> done);
340  }
341
342  @FunctionalInterface
343  private interface Converter<D, S> {
344    D convert(S src) throws IOException;
345  }
346
347  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
348      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
349      Converter<RESP, PRESP> respConverter) {
350    CompletableFuture<RESP> future = new CompletableFuture<>();
351    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
352
353      @Override
354      public void run(PRESP resp) {
355        if (controller.failed()) {
356          future.completeExceptionally(controller.getFailed());
357        } else {
358          try {
359            future.complete(respConverter.convert(resp));
360          } catch (IOException e) {
361            future.completeExceptionally(e);
362          }
363        }
364      }
365    });
366    return future;
367  }
368
369  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
370      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
371      Converter<RESP, PRESP> respConverter) {
372
373    CompletableFuture<RESP> future = new CompletableFuture<>();
374    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
375
376      @Override
377      public void run(PRESP resp) {
378        if (controller.failed()) {
379          future.completeExceptionally(controller.getFailed());
380        } else {
381          try {
382            future.complete(respConverter.convert(resp));
383          } catch (IOException e) {
384            future.completeExceptionally(e);
385          }
386        }
387      }
388    });
389    return future;
390  }
391
392  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
393      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
394      ProcedureBiConsumer consumer) {
395    CompletableFuture<Long> procFuture =
396      this.<Long> newMasterCaller().action((controller, stub) -> this
397        .<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call();
398    CompletableFuture<Void> future = waitProcedureResult(procFuture);
399    addListener(future, consumer);
400    return future;
401  }
402
403  @FunctionalInterface
404  private interface TableOperator {
405    CompletableFuture<Void> operate(TableName table);
406  }
407
408  @Override
409  public CompletableFuture<Boolean> tableExists(TableName tableName) {
410    if (TableName.isMetaTableName(tableName)) {
411      return CompletableFuture.completedFuture(true);
412    }
413    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
414  }
415
416  @Override
417  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
418    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
419      includeSysTables));
420  }
421
422  /**
423   * {@link #listTableDescriptors(boolean)}
424   */
425  @Override
426  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
427      boolean includeSysTables) {
428    Preconditions.checkNotNull(pattern,
429      "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
430    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
431      includeSysTables));
432  }
433
434  private CompletableFuture<List<TableDescriptor>>
435      getTableDescriptors(GetTableDescriptorsRequest request) {
436    return this.<List<TableDescriptor>> newMasterCaller()
437        .action((controller, stub) -> this
438            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
439              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
440              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
441        .call();
442  }
443
444  @Override
445  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
446    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
447  }
448
449  @Override
450  public CompletableFuture<List<TableName>>
451      listTableNames(Pattern pattern, boolean includeSysTables) {
452    Preconditions.checkNotNull(pattern,
453        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
454    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
455  }
456
457  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
458    return this
459        .<List<TableName>> newMasterCaller()
460        .action(
461          (controller, stub) -> this
462              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
463                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
464                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
465  }
466
467  @Override
468  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
469    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
470        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
471        List<TableDescriptor>> call(
472          controller, stub,
473          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
474          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
475          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
476        .call();
477  }
478
479  @Override
480  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
481    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
482        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
483        List<TableName>> call(
484          controller, stub,
485          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
486          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
487          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
488        .call();
489  }
490
491  @Override
492  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
493    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
494    addListener(this.<List<TableSchema>> newMasterCaller()
495      .action((controller, stub) -> this
496        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
497          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
498          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
499          (resp) -> resp.getTableSchemaList()))
500      .call(), (tableSchemas, error) -> {
501        if (error != null) {
502          future.completeExceptionally(error);
503          return;
504        }
505        if (!tableSchemas.isEmpty()) {
506          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
507        } else {
508          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
509        }
510      });
511    return future;
512  }
513
514  @Override
515  public CompletableFuture<Void> createTable(TableDescriptor desc) {
516    return createTable(desc.getTableName(),
517      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
518  }
519
520  @Override
521  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
522      int numRegions) {
523    try {
524      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
525    } catch (IllegalArgumentException e) {
526      return failedFuture(e);
527    }
528  }
529
530  @Override
531  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
532    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
533        + " use createTable(TableDescriptor) instead");
534    try {
535      verifySplitKeys(splitKeys);
536      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
537        splitKeys, ng.getNonceGroup(), ng.newNonce()));
538    } catch (IllegalArgumentException e) {
539      return failedFuture(e);
540    }
541  }
542
543  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
544    Preconditions.checkNotNull(tableName, "table name is null");
545    return this.<CreateTableRequest, CreateTableResponse> procedureCall(request,
546      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
547      new CreateTableProcedureBiConsumer(tableName));
548  }
549
550  @Override
551  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
552    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(
553      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
554        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
555      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
556  }
557
558  @Override
559  public CompletableFuture<Void> deleteTable(TableName tableName) {
560    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
561        .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
562      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
563      new DeleteTableProcedureBiConsumer(tableName));
564  }
565
566  @Override
567  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
568    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
569      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
570        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
571      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
572  }
573
574  @Override
575  public CompletableFuture<Void> enableTable(TableName tableName) {
576    return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
577        .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
578      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
579      new EnableTableProcedureBiConsumer(tableName));
580  }
581
582  @Override
583  public CompletableFuture<Void> disableTable(TableName tableName) {
584    return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
585        .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
586      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
587      new DisableTableProcedureBiConsumer(tableName));
588  }
589
590  @Override
591  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
592    if (TableName.isMetaTableName(tableName)) {
593      return CompletableFuture.completedFuture(true);
594    }
595    CompletableFuture<Boolean> future = new CompletableFuture<>();
596    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
597      if (error != null) {
598        future.completeExceptionally(error);
599        return;
600      }
601      if (state.isPresent()) {
602        future.complete(state.get().inStates(TableState.State.ENABLED));
603      } else {
604        future.completeExceptionally(new TableNotFoundException(tableName));
605      }
606    });
607    return future;
608  }
609
610  @Override
611  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
612    if (TableName.isMetaTableName(tableName)) {
613      return CompletableFuture.completedFuture(false);
614    }
615    CompletableFuture<Boolean> future = new CompletableFuture<>();
616    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
617      if (error != null) {
618        future.completeExceptionally(error);
619        return;
620      }
621      if (state.isPresent()) {
622        future.complete(state.get().inStates(TableState.State.DISABLED));
623      } else {
624        future.completeExceptionally(new TableNotFoundException(tableName));
625      }
626    });
627    return future;
628  }
629
630  @Override
631  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
632    return isTableAvailable(tableName, Optional.empty());
633  }
634
635  @Override
636  public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
637    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
638        + " use isTableAvailable(TableName) instead");
639    return isTableAvailable(tableName, Optional.of(splitKeys));
640  }
641
642  private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
643      Optional<byte[][]> splitKeys) {
644    if (TableName.isMetaTableName(tableName)) {
645      return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream
646        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
647    }
648    CompletableFuture<Boolean> future = new CompletableFuture<>();
649    addListener(isTableEnabled(tableName), (enabled, error) -> {
650      if (error != null) {
651        if (error instanceof TableNotFoundException) {
652          future.complete(false);
653        } else {
654          future.completeExceptionally(error);
655        }
656        return;
657      }
658      if (!enabled) {
659        future.complete(false);
660      } else {
661        addListener(
662          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
663          (locations, error1) -> {
664            if (error1 != null) {
665              future.completeExceptionally(error1);
666              return;
667            }
668            List<HRegionLocation> notDeployedRegions = locations.stream()
669              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
670            if (notDeployedRegions.size() > 0) {
671              if (LOG.isDebugEnabled()) {
672                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
673              }
674              future.complete(false);
675              return;
676            }
677
678            Optional<Boolean> available =
679              splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
680            future.complete(available.orElse(true));
681          });
682      }
683    });
684    return future;
685  }
686
687  private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) {
688    int regionCount = 0;
689    for (HRegionLocation location : locations) {
690      RegionInfo info = location.getRegion();
691      if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
692        regionCount++;
693        continue;
694      }
695      for (byte[] splitKey : splitKeys) {
696        // Just check if the splitkey is available
697        if (Bytes.equals(info.getStartKey(), splitKey)) {
698          regionCount++;
699          break;
700        }
701      }
702    }
703    return regionCount == splitKeys.length + 1;
704  }
705
706  @Override
707  public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
708    return this.<AddColumnRequest, AddColumnResponse> procedureCall(
709      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
710        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
711      new AddColumnFamilyProcedureBiConsumer(tableName));
712  }
713
714  @Override
715  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
716    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
717      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
718        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
719      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
720  }
721
722  @Override
723  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
724      ColumnFamilyDescriptor columnFamily) {
725    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
726      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
727        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
728      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
729  }
730
731  @Override
732  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
733    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
734      RequestConverter.buildCreateNamespaceRequest(descriptor),
735      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
736      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
737  }
738
739  @Override
740  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
741    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
742      RequestConverter.buildModifyNamespaceRequest(descriptor),
743      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
744      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
745  }
746
747  @Override
748  public CompletableFuture<Void> deleteNamespace(String name) {
749    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
750      RequestConverter.buildDeleteNamespaceRequest(name),
751      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
752      new DeleteNamespaceProcedureBiConsumer(name));
753  }
754
755  @Override
756  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
757    return this
758        .<NamespaceDescriptor> newMasterCaller()
759        .action(
760          (controller, stub) -> this
761              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
762                controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
763                    req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
764                    .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
765  }
766
767  @Override
768  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
769    return this
770        .<List<NamespaceDescriptor>> newMasterCaller()
771        .action(
772          (controller, stub) -> this
773              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
774                controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
775                    done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
776                    .toNamespaceDescriptorList(resp))).call();
777  }
778
779  @Override
780  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
781    return this.<List<RegionInfo>> newAdminCaller()
782        .action((controller, stub) -> this
783            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
784              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
785              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
786              resp -> ProtobufUtil.getRegionInfos(resp)))
787        .serverName(serverName).call();
788  }
789
790  @Override
791  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
792    if (tableName.equals(META_TABLE_NAME)) {
793      return connection.getLocator().getRegionLocation(tableName, null, null, operationTimeoutNs)
794          .thenApply(loc -> Collections.singletonList(loc.getRegion()));
795    } else {
796      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
797          .thenApply(
798            locs -> locs.stream().map(loc -> loc.getRegion()).collect(Collectors.toList()));
799    }
800  }
801
802  @Override
803  public CompletableFuture<Void> flush(TableName tableName) {
804    CompletableFuture<Void> future = new CompletableFuture<>();
805    addListener(tableExists(tableName), (exists, err) -> {
806      if (err != null) {
807        future.completeExceptionally(err);
808      } else if (!exists) {
809        future.completeExceptionally(new TableNotFoundException(tableName));
810      } else {
811        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
812          if (err2 != null) {
813            future.completeExceptionally(err2);
814          } else if (!tableEnabled) {
815            future.completeExceptionally(new TableNotEnabledException(tableName));
816          } else {
817            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
818              new HashMap<>()), (ret, err3) -> {
819                if (err3 != null) {
820                  future.completeExceptionally(err3);
821                } else {
822                  future.complete(ret);
823                }
824              });
825          }
826        });
827      }
828    });
829    return future;
830  }
831
832  @Override
833  public CompletableFuture<Void> flushRegion(byte[] regionName) {
834    CompletableFuture<Void> future = new CompletableFuture<>();
835    addListener(getRegionLocation(regionName), (location, err) -> {
836      if (err != null) {
837        future.completeExceptionally(err);
838        return;
839      }
840      ServerName serverName = location.getServerName();
841      if (serverName == null) {
842        future
843          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
844        return;
845      }
846      addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
847        if (err2 != null) {
848          future.completeExceptionally(err2);
849        } else {
850          future.complete(ret);
851        }
852      });
853    });
854    return future;
855  }
856
857  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
858    return this.<Void> newAdminCaller()
859            .serverName(serverName)
860            .action(
861              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
862                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
863                  .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
864                resp -> null))
865            .call();
866  }
867
868  @Override
869  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
870    CompletableFuture<Void> future = new CompletableFuture<>();
871    addListener(getRegions(sn), (hRegionInfos, err) -> {
872      if (err != null) {
873        future.completeExceptionally(err);
874        return;
875      }
876      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
877      if (hRegionInfos != null) {
878        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
879      }
880      addListener(CompletableFuture.allOf(
881        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
882          if (err2 != null) {
883            future.completeExceptionally(err2);
884          } else {
885            future.complete(ret);
886          }
887        });
888    });
889    return future;
890  }
891
892  @Override
893  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
894    return compact(tableName, null, false, compactType);
895  }
896
897  @Override
898  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
899      CompactType compactType) {
900    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
901        + "If you don't specify a columnFamily, use compact(TableName) instead");
902    return compact(tableName, columnFamily, false, compactType);
903  }
904
905  @Override
906  public CompletableFuture<Void> compactRegion(byte[] regionName) {
907    return compactRegion(regionName, null, false);
908  }
909
910  @Override
911  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
912    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
913        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
914    return compactRegion(regionName, columnFamily, false);
915  }
916
917  @Override
918  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
919    return compact(tableName, null, true, compactType);
920  }
921
922  @Override
923  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
924      CompactType compactType) {
925    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
926        + "If you don't specify a columnFamily, use compact(TableName) instead");
927    return compact(tableName, columnFamily, true, compactType);
928  }
929
930  @Override
931  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
932    return compactRegion(regionName, null, true);
933  }
934
935  @Override
936  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
937    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
938        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
939    return compactRegion(regionName, columnFamily, true);
940  }
941
942  @Override
943  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
944    return compactRegionServer(sn, false);
945  }
946
947  @Override
948  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
949    return compactRegionServer(sn, true);
950  }
951
952  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
953    CompletableFuture<Void> future = new CompletableFuture<>();
954    addListener(getRegions(sn), (hRegionInfos, err) -> {
955      if (err != null) {
956        future.completeExceptionally(err);
957        return;
958      }
959      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
960      if (hRegionInfos != null) {
961        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
962      }
963      addListener(CompletableFuture.allOf(
964        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
965          if (err2 != null) {
966            future.completeExceptionally(err2);
967          } else {
968            future.complete(ret);
969          }
970        });
971    });
972    return future;
973  }
974
975  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
976      boolean major) {
977    CompletableFuture<Void> future = new CompletableFuture<>();
978    addListener(getRegionLocation(regionName), (location, err) -> {
979      if (err != null) {
980        future.completeExceptionally(err);
981        return;
982      }
983      ServerName serverName = location.getServerName();
984      if (serverName == null) {
985        future
986          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
987        return;
988      }
989      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
990        (ret, err2) -> {
991          if (err2 != null) {
992            future.completeExceptionally(err2);
993          } else {
994            future.complete(ret);
995          }
996        });
997    });
998    return future;
999  }
1000
1001  /**
1002   * List all region locations for the specific table.
1003   */
1004  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1005    if (TableName.META_TABLE_NAME.equals(tableName)) {
1006      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1007      // For meta table, we use zk to fetch all locations.
1008      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
1009      addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
1010        if (err != null) {
1011          future.completeExceptionally(err);
1012        } else if (metaRegions == null || metaRegions.isEmpty() ||
1013          metaRegions.getDefaultRegionLocation() == null) {
1014          future.completeExceptionally(new IOException("meta region does not found"));
1015        } else {
1016          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1017        }
1018        // close the registry.
1019        IOUtils.closeQuietly(registry);
1020      });
1021      return future;
1022    } else {
1023      // For non-meta table, we fetch all locations by scanning hbase:meta table
1024      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
1025    }
1026  }
1027
1028  /**
1029   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1030   */
1031  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1032      CompactType compactType) {
1033    CompletableFuture<Void> future = new CompletableFuture<>();
1034
1035    switch (compactType) {
1036      case MOB:
1037        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
1038          if (err != null) {
1039            future.completeExceptionally(err);
1040            return;
1041          }
1042          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1043          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1044            if (err2 != null) {
1045              future.completeExceptionally(err2);
1046            } else {
1047              future.complete(ret);
1048            }
1049          });
1050        });
1051        break;
1052      case NORMAL:
1053        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1054          if (err != null) {
1055            future.completeExceptionally(err);
1056            return;
1057          }
1058          if (locations == null || locations.isEmpty()) {
1059            future.completeExceptionally(new TableNotFoundException(tableName));
1060          }
1061          CompletableFuture<?>[] compactFutures =
1062            locations.stream().filter(l -> l.getRegion() != null)
1063              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1064              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1065              .toArray(CompletableFuture<?>[]::new);
1066          // future complete unless all of the compact futures are completed.
1067          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1068            if (err2 != null) {
1069              future.completeExceptionally(err2);
1070            } else {
1071              future.complete(ret);
1072            }
1073          });
1074        });
1075        break;
1076      default:
1077        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1078    }
1079    return future;
1080  }
1081
1082  /**
1083   * Compact the region at specific region server.
1084   */
1085  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1086      final boolean major, byte[] columnFamily) {
1087    return this
1088        .<Void> newAdminCaller()
1089        .serverName(sn)
1090        .action(
1091          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1092            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1093              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1094            resp -> null)).call();
1095  }
1096
1097  private byte[] toEncodeRegionName(byte[] regionName) {
1098    try {
1099      return RegionInfo.isEncodedRegionName(regionName) ? regionName
1100          : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1101    } catch (IOException e) {
1102      return regionName;
1103    }
1104  }
1105
1106  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1107      CompletableFuture<TableName> result) {
1108    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1109      if (err != null) {
1110        result.completeExceptionally(err);
1111        return;
1112      }
1113      RegionInfo regionInfo = location.getRegion();
1114      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1115        result.completeExceptionally(
1116          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1117        return;
1118      }
1119      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1120        if (!tableName.get().equals(regionInfo.getTable())) {
1121          // tables of this two region should be same.
1122          result.completeExceptionally(
1123            new IllegalArgumentException("Cannot merge regions from two different tables " +
1124              tableName.get() + " and " + regionInfo.getTable()));
1125        } else {
1126          result.complete(tableName.get());
1127        }
1128      }
1129    });
1130  }
1131
1132  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
1133      byte[] encodeRegionNameB) {
1134    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1135    CompletableFuture<TableName> future = new CompletableFuture<>();
1136
1137    checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
1138    checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
1139    return future;
1140  }
1141
1142  @Override
1143  public CompletableFuture<Boolean> mergeSwitch(boolean on) {
1144    return setSplitOrMergeOn(on, MasterSwitchType.MERGE);
1145  }
1146
1147  @Override
1148  public CompletableFuture<Boolean> isMergeEnabled() {
1149    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1150  }
1151
1152  @Override
1153  public CompletableFuture<Boolean> splitSwitch(boolean on) {
1154    return setSplitOrMergeOn(on, MasterSwitchType.SPLIT);
1155  }
1156
1157  @Override
1158  public CompletableFuture<Boolean> isSplitEnabled() {
1159    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1160  }
1161
1162  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean on, MasterSwitchType switchType) {
1163    SetSplitOrMergeEnabledRequest request =
1164        RequestConverter.buildSetSplitOrMergeEnabledRequest(on, false, switchType);
1165    return this
1166        .<Boolean> newMasterCaller()
1167        .action(
1168          (controller, stub) -> this
1169              .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(
1170                controller, stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req,
1171                  done), (resp) -> resp.getPrevValueList().get(0))).call();
1172  }
1173
1174  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1175    IsSplitOrMergeEnabledRequest request =
1176        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1177    return this
1178        .<Boolean> newMasterCaller()
1179        .action(
1180          (controller, stub) -> this
1181              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1182                controller, stub, request,
1183                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1184                (resp) -> resp.getEnabled())).call();
1185  }
1186
1187  @Override
1188  public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
1189      boolean forcible) {
1190    CompletableFuture<Void> future = new CompletableFuture<>();
1191    final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
1192    final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
1193
1194    addListener(checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB),
1195      (tableName, err) -> {
1196        if (err != null) {
1197          future.completeExceptionally(err);
1198          return;
1199        }
1200
1201        MergeTableRegionsRequest request = null;
1202        try {
1203          request = RequestConverter.buildMergeTableRegionsRequest(
1204            new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
1205            ng.newNonce());
1206        } catch (DeserializationException e) {
1207          future.completeExceptionally(e);
1208          return;
1209        }
1210
1211        addListener(
1212          this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
1213            (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
1214            new MergeTableRegionProcedureBiConsumer(tableName)),
1215          (ret, err2) -> {
1216            if (err2 != null) {
1217              future.completeExceptionally(err2);
1218            } else {
1219              future.complete(ret);
1220            }
1221          });
1222      });
1223    return future;
1224  }
1225
1226  @Override
1227  public CompletableFuture<Void> split(TableName tableName) {
1228    CompletableFuture<Void> future = new CompletableFuture<>();
1229    addListener(tableExists(tableName), (exist, error) -> {
1230      if (error != null) {
1231        future.completeExceptionally(error);
1232        return;
1233      }
1234      if (!exist) {
1235        future.completeExceptionally(new TableNotFoundException(tableName));
1236        return;
1237      }
1238      addListener(
1239        metaTable
1240          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1241            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1242            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1243        (results, err2) -> {
1244          if (err2 != null) {
1245            future.completeExceptionally(err2);
1246            return;
1247          }
1248          if (results != null && !results.isEmpty()) {
1249            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1250            for (Result r : results) {
1251              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1252                continue;
1253              }
1254              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1255              if (rl != null) {
1256                for (HRegionLocation h : rl.getRegionLocations()) {
1257                  if (h != null && h.getServerName() != null) {
1258                    RegionInfo hri = h.getRegion();
1259                    if (hri == null || hri.isSplitParent() ||
1260                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1261                      continue;
1262                    }
1263                    splitFutures.add(split(hri, null));
1264                  }
1265                }
1266              }
1267            }
1268            addListener(
1269              CompletableFuture
1270                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1271              (ret, exception) -> {
1272                if (exception != null) {
1273                  future.completeExceptionally(exception);
1274                  return;
1275                }
1276                future.complete(ret);
1277              });
1278          } else {
1279            future.complete(null);
1280          }
1281        });
1282    });
1283    return future;
1284  }
1285
1286  @Override
1287  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1288    CompletableFuture<Void> result = new CompletableFuture<>();
1289    if (splitPoint == null) {
1290      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1291    }
1292    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1293      (loc, err) -> {
1294        if (err != null) {
1295          result.completeExceptionally(err);
1296        } else if (loc == null || loc.getRegion() == null) {
1297          result.completeExceptionally(new IllegalArgumentException(
1298            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1299        } else {
1300          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1301            if (err2 != null) {
1302              result.completeExceptionally(err2);
1303            } else {
1304              result.complete(ret);
1305            }
1306
1307          });
1308        }
1309      });
1310    return result;
1311  }
1312
1313  @Override
1314  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1315    CompletableFuture<Void> future = new CompletableFuture<>();
1316    addListener(getRegionLocation(regionName), (location, err) -> {
1317      if (err != null) {
1318        future.completeExceptionally(err);
1319        return;
1320      }
1321      RegionInfo regionInfo = location.getRegion();
1322      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1323        future
1324          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1325            "Replicas are auto-split when their primary is split."));
1326        return;
1327      }
1328      ServerName serverName = location.getServerName();
1329      if (serverName == null) {
1330        future
1331          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1332        return;
1333      }
1334      addListener(split(regionInfo, null), (ret, err2) -> {
1335        if (err2 != null) {
1336          future.completeExceptionally(err2);
1337        } else {
1338          future.complete(ret);
1339        }
1340      });
1341    });
1342    return future;
1343  }
1344
1345  @Override
1346  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1347    Preconditions.checkNotNull(splitPoint,
1348      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1349    CompletableFuture<Void> future = new CompletableFuture<>();
1350    addListener(getRegionLocation(regionName), (location, err) -> {
1351      if (err != null) {
1352        future.completeExceptionally(err);
1353        return;
1354      }
1355      RegionInfo regionInfo = location.getRegion();
1356      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1357        future
1358          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1359            "Replicas are auto-split when their primary is split."));
1360        return;
1361      }
1362      ServerName serverName = location.getServerName();
1363      if (serverName == null) {
1364        future
1365          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1366        return;
1367      }
1368      if (regionInfo.getStartKey() != null &&
1369        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1370        future.completeExceptionally(
1371          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1372        return;
1373      }
1374      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1375        if (err2 != null) {
1376          future.completeExceptionally(err2);
1377        } else {
1378          future.complete(ret);
1379        }
1380      });
1381    });
1382    return future;
1383  }
1384
1385  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1386    CompletableFuture<Void> future = new CompletableFuture<>();
1387    TableName tableName = hri.getTable();
1388    SplitTableRegionRequest request = null;
1389    try {
1390      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1391        ng.newNonce());
1392    } catch (DeserializationException e) {
1393      future.completeExceptionally(e);
1394      return future;
1395    }
1396
1397    addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request,
1398      (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
1399      new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1400        if (err2 != null) {
1401          future.completeExceptionally(err2);
1402        } else {
1403          future.complete(ret);
1404        }
1405      });
1406    return future;
1407  }
1408
1409  @Override
1410  public CompletableFuture<Void> assign(byte[] regionName) {
1411    CompletableFuture<Void> future = new CompletableFuture<>();
1412    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1413      if (err != null) {
1414        future.completeExceptionally(err);
1415        return;
1416      }
1417      addListener(this.<Void> newMasterCaller()
1418        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1419          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1420          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1421        .call(), (ret, err2) -> {
1422          if (err2 != null) {
1423            future.completeExceptionally(err2);
1424          } else {
1425            future.complete(ret);
1426          }
1427        });
1428    });
1429    return future;
1430  }
1431
1432  @Override
1433  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1434    CompletableFuture<Void> future = new CompletableFuture<>();
1435    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1436      if (err != null) {
1437        future.completeExceptionally(err);
1438        return;
1439      }
1440      addListener(
1441        this.<Void> newMasterCaller()
1442          .action(((controller, stub) -> this
1443            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1444              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1445              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1446          .call(),
1447        (ret, err2) -> {
1448          if (err2 != null) {
1449            future.completeExceptionally(err2);
1450          } else {
1451            future.complete(ret);
1452          }
1453        });
1454    });
1455    return future;
1456  }
1457
1458  @Override
1459  public CompletableFuture<Void> offline(byte[] regionName) {
1460    CompletableFuture<Void> future = new CompletableFuture<>();
1461    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1462      if (err != null) {
1463        future.completeExceptionally(err);
1464        return;
1465      }
1466      addListener(
1467        this.<Void> newMasterCaller()
1468          .action(((controller, stub) -> this
1469            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1470              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1471              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1472          .call(),
1473        (ret, err2) -> {
1474          if (err2 != null) {
1475            future.completeExceptionally(err2);
1476          } else {
1477            future.complete(ret);
1478          }
1479        });
1480    });
1481    return future;
1482  }
1483
1484  @Override
1485  public CompletableFuture<Void> move(byte[] regionName) {
1486    CompletableFuture<Void> future = new CompletableFuture<>();
1487    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1488      if (err != null) {
1489        future.completeExceptionally(err);
1490        return;
1491      }
1492      addListener(
1493        moveRegion(
1494          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1495        (ret, err2) -> {
1496          if (err2 != null) {
1497            future.completeExceptionally(err2);
1498          } else {
1499            future.complete(ret);
1500          }
1501        });
1502    });
1503    return future;
1504  }
1505
1506  @Override
1507  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1508    Preconditions.checkNotNull(destServerName,
1509      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1510    CompletableFuture<Void> future = new CompletableFuture<>();
1511    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1512      if (err != null) {
1513        future.completeExceptionally(err);
1514        return;
1515      }
1516      addListener(moveRegion(RequestConverter
1517        .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1518        (ret, err2) -> {
1519          if (err2 != null) {
1520            future.completeExceptionally(err2);
1521          } else {
1522            future.complete(ret);
1523          }
1524        });
1525    });
1526    return future;
1527  }
1528
1529  private CompletableFuture<Void> moveRegion(MoveRegionRequest request) {
1530    return this
1531        .<Void> newMasterCaller()
1532        .action(
1533          (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1534            stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)).call();
1535  }
1536
1537  @Override
1538  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1539    return this
1540        .<Void> newMasterCaller()
1541        .action(
1542          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1543            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1544            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1545  }
1546
1547  @Override
1548  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1549    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1550    Scan scan = QuotaTableUtil.makeScan(filter);
1551    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1552        .scan(scan, new AdvancedScanResultConsumer() {
1553          List<QuotaSettings> settings = new ArrayList<>();
1554
1555          @Override
1556          public void onNext(Result[] results, ScanController controller) {
1557            for (Result result : results) {
1558              try {
1559                QuotaTableUtil.parseResultToCollection(result, settings);
1560              } catch (IOException e) {
1561                controller.terminate();
1562                future.completeExceptionally(e);
1563              }
1564            }
1565          }
1566
1567          @Override
1568          public void onError(Throwable error) {
1569            future.completeExceptionally(error);
1570          }
1571
1572          @Override
1573          public void onComplete() {
1574            future.complete(settings);
1575          }
1576        });
1577    return future;
1578  }
1579
1580  @Override
1581  public CompletableFuture<Void> addReplicationPeer(String peerId,
1582      ReplicationPeerConfig peerConfig, boolean enabled) {
1583    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1584      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1585      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1586      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1587  }
1588
1589  @Override
1590  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1591    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1592      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1593      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1594      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1595  }
1596
1597  @Override
1598  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1599    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1600      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1601      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1602      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1603  }
1604
1605  @Override
1606  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1607    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1608      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1609      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1610      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1611  }
1612
1613  @Override
1614  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1615    return this
1616        .<ReplicationPeerConfig> newMasterCaller()
1617        .action(
1618          (controller, stub) -> this
1619              .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
1620                controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
1621                    s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1622                (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1623  }
1624
1625  @Override
1626  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1627      ReplicationPeerConfig peerConfig) {
1628    return this
1629        .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1630          RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1631          (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1632          (resp) -> resp.getProcId(),
1633          new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1634  }
1635
1636  @Override
1637  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1638      Map<TableName, List<String>> tableCfs) {
1639    if (tableCfs == null) {
1640      return failedFuture(new ReplicationException("tableCfs is null"));
1641    }
1642
1643    CompletableFuture<Void> future = new CompletableFuture<Void>();
1644    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1645      if (!completeExceptionally(future, error)) {
1646        ReplicationPeerConfig newPeerConfig =
1647          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1648        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1649          if (!completeExceptionally(future, error)) {
1650            future.complete(result);
1651          }
1652        });
1653      }
1654    });
1655    return future;
1656  }
1657
1658  @Override
1659  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1660      Map<TableName, List<String>> tableCfs) {
1661    if (tableCfs == null) {
1662      return failedFuture(new ReplicationException("tableCfs is null"));
1663    }
1664
1665    CompletableFuture<Void> future = new CompletableFuture<Void>();
1666    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1667      if (!completeExceptionally(future, error)) {
1668        ReplicationPeerConfig newPeerConfig = null;
1669        try {
1670          newPeerConfig = ReplicationPeerConfigUtil
1671            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1672        } catch (ReplicationException e) {
1673          future.completeExceptionally(e);
1674          return;
1675        }
1676        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1677          if (!completeExceptionally(future, error)) {
1678            future.complete(result);
1679          }
1680        });
1681      }
1682    });
1683    return future;
1684  }
1685
1686  @Override
1687  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1688    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1689  }
1690
1691  @Override
1692  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1693    Preconditions.checkNotNull(pattern,
1694      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1695    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1696  }
1697
1698  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1699      ListReplicationPeersRequest request) {
1700    return this
1701        .<List<ReplicationPeerDescription>> newMasterCaller()
1702        .action(
1703          (controller, stub) -> this
1704              .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
1705                controller,
1706                stub,
1707                request,
1708                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1709                (resp) -> resp.getPeerDescList().stream()
1710                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1711                    .collect(Collectors.toList()))).call();
1712  }
1713
1714  @Override
1715  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1716    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1717    addListener(listTableDescriptors(), (tables, error) -> {
1718      if (!completeExceptionally(future, error)) {
1719        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1720        tables.forEach(table -> {
1721          Map<String, Integer> cfs = new HashMap<>();
1722          Stream.of(table.getColumnFamilies())
1723            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1724            .forEach(column -> {
1725              cfs.put(column.getNameAsString(), column.getScope());
1726            });
1727          if (!cfs.isEmpty()) {
1728            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1729          }
1730        });
1731        future.complete(replicatedTableCFs);
1732      }
1733    });
1734    return future;
1735  }
1736
1737  @Override
1738  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1739    SnapshotProtos.SnapshotDescription snapshot =
1740      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1741    try {
1742      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1743    } catch (IllegalArgumentException e) {
1744      return failedFuture(e);
1745    }
1746    CompletableFuture<Void> future = new CompletableFuture<>();
1747    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1748    addListener(this.<Long> newMasterCaller()
1749      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1750        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1751        resp -> resp.getExpectedTimeout()))
1752      .call(), (expectedTimeout, err) -> {
1753        if (err != null) {
1754          future.completeExceptionally(err);
1755          return;
1756        }
1757        TimerTask pollingTask = new TimerTask() {
1758          int tries = 0;
1759          long startTime = EnvironmentEdgeManager.currentTime();
1760          long endTime = startTime + expectedTimeout;
1761          long maxPauseTime = expectedTimeout / maxAttempts;
1762
1763          @Override
1764          public void run(Timeout timeout) throws Exception {
1765            if (EnvironmentEdgeManager.currentTime() < endTime) {
1766              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1767                if (err2 != null) {
1768                  future.completeExceptionally(err2);
1769                } else if (done) {
1770                  future.complete(null);
1771                } else {
1772                  // retry again after pauseTime.
1773                  long pauseTime =
1774                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1775                  pauseTime = Math.min(pauseTime, maxPauseTime);
1776                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1777                    TimeUnit.MILLISECONDS);
1778                }
1779              });
1780            } else {
1781              future.completeExceptionally(
1782                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1783                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1784            }
1785          }
1786        };
1787        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1788      });
1789    return future;
1790  }
1791
1792  @Override
1793  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1794    return this
1795        .<Boolean> newMasterCaller()
1796        .action(
1797          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1798            controller,
1799            stub,
1800            IsSnapshotDoneRequest.newBuilder()
1801                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1802                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1803  }
1804
1805  @Override
1806  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1807    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1808      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1809      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1810    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1811  }
1812
1813  @Override
1814  public CompletableFuture<Void> restoreSnapshot(String snapshotName,
1815      boolean takeFailSafeSnapshot) {
1816    CompletableFuture<Void> future = new CompletableFuture<>();
1817    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1818      if (err != null) {
1819        future.completeExceptionally(err);
1820        return;
1821      }
1822      TableName tableName = null;
1823      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1824        for (SnapshotDescription snap : snapshotDescriptions) {
1825          if (snap.getName().equals(snapshotName)) {
1826            tableName = snap.getTableName();
1827            break;
1828          }
1829        }
1830      }
1831      if (tableName == null) {
1832        future.completeExceptionally(new RestoreSnapshotException(
1833          "Unable to find the table name for snapshot=" + snapshotName));
1834        return;
1835      }
1836      final TableName finalTableName = tableName;
1837      addListener(tableExists(finalTableName), (exists, err2) -> {
1838        if (err2 != null) {
1839          future.completeExceptionally(err2);
1840        } else if (!exists) {
1841          // if table does not exist, then just clone snapshot into new table.
1842          completeConditionalOnFuture(future,
1843            internalRestoreSnapshot(snapshotName, finalTableName));
1844        } else {
1845          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1846            if (err4 != null) {
1847              future.completeExceptionally(err4);
1848            } else if (!disabled) {
1849              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1850            } else {
1851              completeConditionalOnFuture(future,
1852                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
1853            }
1854          });
1855        }
1856      });
1857    });
1858    return future;
1859  }
1860
1861  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1862      boolean takeFailSafeSnapshot) {
1863    if (takeFailSafeSnapshot) {
1864      CompletableFuture<Void> future = new CompletableFuture<>();
1865      // Step.1 Take a snapshot of the current state
1866      String failSafeSnapshotSnapshotNameFormat =
1867        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1868          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1869      final String failSafeSnapshotSnapshotName =
1870        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1871          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1872          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1873      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1874      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1875        if (err != null) {
1876          future.completeExceptionally(err);
1877        } else {
1878          // Step.2 Restore snapshot
1879          addListener(internalRestoreSnapshot(snapshotName, tableName), (void2, err2) -> {
1880            if (err2 != null) {
1881              // Step.3.a Something went wrong during the restore and try to rollback.
1882              addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName),
1883                (void3, err3) -> {
1884                  if (err3 != null) {
1885                    future.completeExceptionally(err3);
1886                  } else {
1887                    String msg =
1888                      "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1889                        failSafeSnapshotSnapshotName + " succeeded.";
1890                    future.completeExceptionally(new RestoreSnapshotException(msg));
1891                  }
1892                });
1893            } else {
1894              // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
1895              LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1896              addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
1897                if (err3 != null) {
1898                  LOG.error(
1899                    "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
1900                    err3);
1901                  future.completeExceptionally(err3);
1902                } else {
1903                  future.complete(ret3);
1904                }
1905              });
1906            }
1907          });
1908        }
1909      });
1910      return future;
1911    } else {
1912      return internalRestoreSnapshot(snapshotName, tableName);
1913    }
1914  }
1915
1916  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
1917      CompletableFuture<T> parentFuture) {
1918    addListener(parentFuture, (res, err) -> {
1919      if (err != null) {
1920        dependentFuture.completeExceptionally(err);
1921      } else {
1922        dependentFuture.complete(res);
1923      }
1924    });
1925  }
1926
1927  @Override
1928  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
1929    CompletableFuture<Void> future = new CompletableFuture<>();
1930    addListener(tableExists(tableName), (exists, err) -> {
1931      if (err != null) {
1932        future.completeExceptionally(err);
1933      } else if (exists) {
1934        future.completeExceptionally(new TableExistsException(tableName));
1935      } else {
1936        completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName));
1937      }
1938    });
1939    return future;
1940  }
1941
1942  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) {
1943    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
1944        .setName(snapshotName).setTable(tableName.getNameAsString()).build();
1945    try {
1946      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1947    } catch (IllegalArgumentException e) {
1948      return failedFuture(e);
1949    }
1950    return waitProcedureResult(this
1951        .<Long> newMasterCaller()
1952        .action(
1953          (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(
1954            controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
1955                .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
1956                done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call());
1957  }
1958
1959  @Override
1960  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
1961    return getCompletedSnapshots(null);
1962  }
1963
1964  @Override
1965  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
1966    Preconditions.checkNotNull(pattern,
1967      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
1968    return getCompletedSnapshots(pattern);
1969  }
1970
1971  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
1972    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
1973        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
1974        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
1975          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
1976          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
1977        .call();
1978  }
1979
1980  @Override
1981  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
1982    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
1983        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
1984    return getCompletedSnapshots(tableNamePattern, null);
1985  }
1986
1987  @Override
1988  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
1989      Pattern snapshotNamePattern) {
1990    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
1991        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
1992    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
1993        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
1994    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
1995  }
1996
1997  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
1998      Pattern tableNamePattern, Pattern snapshotNamePattern) {
1999    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2000    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2001      if (err != null) {
2002        future.completeExceptionally(err);
2003        return;
2004      }
2005      if (tableNames == null || tableNames.size() <= 0) {
2006        future.complete(Collections.emptyList());
2007        return;
2008      }
2009      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2010        if (err2 != null) {
2011          future.completeExceptionally(err2);
2012          return;
2013        }
2014        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2015          future.complete(Collections.emptyList());
2016          return;
2017        }
2018        future.complete(snapshotDescList.stream()
2019          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2020          .collect(Collectors.toList()));
2021      });
2022    });
2023    return future;
2024  }
2025
2026  @Override
2027  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2028    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2029  }
2030
2031  @Override
2032  public CompletableFuture<Void> deleteSnapshots() {
2033    return internalDeleteSnapshots(null, null);
2034  }
2035
2036  @Override
2037  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2038    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2039        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2040    return internalDeleteSnapshots(null, snapshotNamePattern);
2041  }
2042
2043  @Override
2044  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2045    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2046        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2047    return internalDeleteSnapshots(tableNamePattern, null);
2048  }
2049
2050  @Override
2051  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2052      Pattern snapshotNamePattern) {
2053    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2054        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2055    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2056        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2057    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2058  }
2059
2060  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2061      Pattern snapshotNamePattern) {
2062    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2063    if (tableNamePattern == null) {
2064      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2065    } else {
2066      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2067    }
2068    CompletableFuture<Void> future = new CompletableFuture<>();
2069    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2070      if (err != null) {
2071        future.completeExceptionally(err);
2072        return;
2073      }
2074      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2075        future.complete(null);
2076        return;
2077      }
2078      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2079        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2080          if (e != null) {
2081            future.completeExceptionally(e);
2082          } else {
2083            future.complete(v);
2084          }
2085        });
2086    }));
2087    return future;
2088  }
2089
2090  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2091    return this
2092        .<Void> newMasterCaller()
2093        .action(
2094          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2095            controller,
2096            stub,
2097            DeleteSnapshotRequest.newBuilder()
2098                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2099                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2100  }
2101
2102  @Override
2103  public CompletableFuture<Void> execProcedure(String signature, String instance,
2104      Map<String, String> props) {
2105    CompletableFuture<Void> future = new CompletableFuture<>();
2106    ProcedureDescription procDesc =
2107      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2108    addListener(this.<Long> newMasterCaller()
2109      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2110        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2111        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2112      .call(), (expectedTimeout, err) -> {
2113        if (err != null) {
2114          future.completeExceptionally(err);
2115          return;
2116        }
2117        TimerTask pollingTask = new TimerTask() {
2118          int tries = 0;
2119          long startTime = EnvironmentEdgeManager.currentTime();
2120          long endTime = startTime + expectedTimeout;
2121          long maxPauseTime = expectedTimeout / maxAttempts;
2122
2123          @Override
2124          public void run(Timeout timeout) throws Exception {
2125            if (EnvironmentEdgeManager.currentTime() < endTime) {
2126              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2127                if (err2 != null) {
2128                  future.completeExceptionally(err2);
2129                  return;
2130                }
2131                if (done) {
2132                  future.complete(null);
2133                } else {
2134                  // retry again after pauseTime.
2135                  long pauseTime =
2136                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2137                  pauseTime = Math.min(pauseTime, maxPauseTime);
2138                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2139                    TimeUnit.MICROSECONDS);
2140                }
2141              });
2142            } else {
2143              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2144                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2145            }
2146          }
2147        };
2148        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2149        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2150      });
2151    return future;
2152  }
2153
2154  @Override
2155  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2156      Map<String, String> props) {
2157    ProcedureDescription proDesc =
2158        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2159    return this.<byte[]> newMasterCaller()
2160        .action(
2161          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2162            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2163            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2164            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2165        .call();
2166  }
2167
2168  @Override
2169  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2170      Map<String, String> props) {
2171    ProcedureDescription proDesc =
2172        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2173    return this.<Boolean> newMasterCaller()
2174        .action((controller, stub) -> this
2175            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2176              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2177              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2178        .call();
2179  }
2180
2181  @Override
2182  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2183    return this.<Boolean> newMasterCaller().action(
2184      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2185        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2186        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2187        .call();
2188  }
2189
2190  @Override
2191  public CompletableFuture<String> getProcedures() {
2192    return this
2193        .<String> newMasterCaller()
2194        .action(
2195          (controller, stub) -> this
2196              .<GetProceduresRequest, GetProceduresResponse, String> call(
2197                controller, stub, GetProceduresRequest.newBuilder().build(),
2198                (s, c, req, done) -> s.getProcedures(c, req, done),
2199                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2200  }
2201
2202  @Override
2203  public CompletableFuture<String> getLocks() {
2204    return this
2205        .<String> newMasterCaller()
2206        .action(
2207          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2208            controller, stub, GetLocksRequest.newBuilder().build(),
2209            (s, c, req, done) -> s.getLocks(c, req, done),
2210            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2211  }
2212
2213  @Override
2214  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
2215    return this.<Void> newMasterCaller()
2216        .action((controller, stub) -> this
2217          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2218            controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2219            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2220        .call();
2221  }
2222
2223  @Override
2224  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2225    return this.<List<ServerName>> newMasterCaller()
2226        .action((controller, stub) -> this
2227          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2228            List<ServerName>> call(
2229              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2230              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2231              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2232                  .collect(Collectors.toList())))
2233        .call();
2234  }
2235
2236  @Override
2237  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2238      List<byte[]> encodedRegionNames) {
2239    return this.<Void> newMasterCaller()
2240        .action((controller, stub) -> this
2241          .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
2242            stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2243            (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2244        .call();
2245  }
2246
2247  /**
2248   * Get the region location for the passed region name. The region name may be a full region name
2249   * or encoded region name. If the region does not found, then it'll throw an
2250   * UnknownRegionException wrapped by a {@link CompletableFuture}
2251   * @param regionNameOrEncodedRegionName
2252   * @return region location, wrapped by a {@link CompletableFuture}
2253   */
2254  @VisibleForTesting
2255  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2256    if (regionNameOrEncodedRegionName == null) {
2257      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2258    }
2259    try {
2260      CompletableFuture<Optional<HRegionLocation>> future;
2261      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2262        future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2263          regionNameOrEncodedRegionName);
2264      } else {
2265        future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2266      }
2267
2268      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2269      addListener(future, (location, err) -> {
2270        if (err != null) {
2271          returnedFuture.completeExceptionally(err);
2272          return;
2273        }
2274        if (!location.isPresent() || location.get().getRegion() == null) {
2275          returnedFuture.completeExceptionally(
2276            new UnknownRegionException("Invalid region name or encoded region name: " +
2277              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2278        } else {
2279          returnedFuture.complete(location.get());
2280        }
2281      });
2282      return returnedFuture;
2283    } catch (IOException e) {
2284      return failedFuture(e);
2285    }
2286  }
2287
2288  /**
2289   * Get the region info for the passed region name. The region name may be a full region name or
2290   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2291   * wrapped by a {@link CompletableFuture}
2292   * @param regionNameOrEncodedRegionName
2293   * @return region info, wrapped by a {@link CompletableFuture}
2294   */
2295  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2296    if (regionNameOrEncodedRegionName == null) {
2297      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2298    }
2299
2300    if (Bytes.equals(regionNameOrEncodedRegionName,
2301      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2302      Bytes.equals(regionNameOrEncodedRegionName,
2303        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2304      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2305    }
2306
2307    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2308    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2309      if (err != null) {
2310        future.completeExceptionally(err);
2311      } else {
2312        future.complete(location.getRegion());
2313      }
2314    });
2315    return future;
2316  }
2317
2318  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2319    if (numRegions < 3) {
2320      throw new IllegalArgumentException("Must create at least three regions");
2321    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2322      throw new IllegalArgumentException("Start key must be smaller than end key");
2323    }
2324    if (numRegions == 3) {
2325      return new byte[][] { startKey, endKey };
2326    }
2327    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2328    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2329      throw new IllegalArgumentException("Unable to split key range into enough regions");
2330    }
2331    return splitKeys;
2332  }
2333
2334  private void verifySplitKeys(byte[][] splitKeys) {
2335    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2336    // Verify there are no duplicate split keys
2337    byte[] lastKey = null;
2338    for (byte[] splitKey : splitKeys) {
2339      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2340        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2341      }
2342      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2343        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2344            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2345      }
2346      lastKey = splitKey;
2347    }
2348  }
2349
2350  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2351
2352    abstract void onFinished();
2353
2354    abstract void onError(Throwable error);
2355
2356    @Override
2357    public void accept(Void v, Throwable error) {
2358      if (error != null) {
2359        onError(error);
2360        return;
2361      }
2362      onFinished();
2363    }
2364  }
2365
2366  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2367    protected final TableName tableName;
2368
2369    TableProcedureBiConsumer(TableName tableName) {
2370      this.tableName = tableName;
2371    }
2372
2373    abstract String getOperationType();
2374
2375    String getDescription() {
2376      return "Operation: " + getOperationType() + ", " + "Table Name: "
2377          + tableName.getNameWithNamespaceInclAsString();
2378    }
2379
2380    @Override
2381    void onFinished() {
2382      LOG.info(getDescription() + " completed");
2383    }
2384
2385    @Override
2386    void onError(Throwable error) {
2387      LOG.info(getDescription() + " failed with " + error.getMessage());
2388    }
2389  }
2390
2391  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2392    protected final String namespaceName;
2393
2394    NamespaceProcedureBiConsumer(String namespaceName) {
2395      this.namespaceName = namespaceName;
2396    }
2397
2398    abstract String getOperationType();
2399
2400    String getDescription() {
2401      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2402    }
2403
2404    @Override
2405    void onFinished() {
2406      LOG.info(getDescription() + " completed");
2407    }
2408
2409    @Override
2410    void onError(Throwable error) {
2411      LOG.info(getDescription() + " failed with " + error.getMessage());
2412    }
2413  }
2414
2415  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2416
2417    CreateTableProcedureBiConsumer(TableName tableName) {
2418      super(tableName);
2419    }
2420
2421    @Override
2422    String getOperationType() {
2423      return "CREATE";
2424    }
2425  }
2426
2427  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2428
2429    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2430      super(tableName);
2431    }
2432
2433    @Override
2434    String getOperationType() {
2435      return "ENABLE";
2436    }
2437  }
2438
2439  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2440
2441    DeleteTableProcedureBiConsumer(TableName tableName) {
2442      super(tableName);
2443    }
2444
2445    @Override
2446    String getOperationType() {
2447      return "DELETE";
2448    }
2449
2450    @Override
2451    void onFinished() {
2452      connection.getLocator().clearCache(this.tableName);
2453      super.onFinished();
2454    }
2455  }
2456
2457  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2458
2459    TruncateTableProcedureBiConsumer(TableName tableName) {
2460      super(tableName);
2461    }
2462
2463    @Override
2464    String getOperationType() {
2465      return "TRUNCATE";
2466    }
2467  }
2468
2469  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2470
2471    EnableTableProcedureBiConsumer(TableName tableName) {
2472      super(tableName);
2473    }
2474
2475    @Override
2476    String getOperationType() {
2477      return "ENABLE";
2478    }
2479  }
2480
2481  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2482
2483    DisableTableProcedureBiConsumer(TableName tableName) {
2484      super(tableName);
2485    }
2486
2487    @Override
2488    String getOperationType() {
2489      return "DISABLE";
2490    }
2491  }
2492
2493  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2494
2495    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2496      super(tableName);
2497    }
2498
2499    @Override
2500    String getOperationType() {
2501      return "ADD_COLUMN_FAMILY";
2502    }
2503  }
2504
2505  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2506
2507    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2508      super(tableName);
2509    }
2510
2511    @Override
2512    String getOperationType() {
2513      return "DELETE_COLUMN_FAMILY";
2514    }
2515  }
2516
2517  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2518
2519    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2520      super(tableName);
2521    }
2522
2523    @Override
2524    String getOperationType() {
2525      return "MODIFY_COLUMN_FAMILY";
2526    }
2527  }
2528
2529  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2530
2531    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2532      super(namespaceName);
2533    }
2534
2535    @Override
2536    String getOperationType() {
2537      return "CREATE_NAMESPACE";
2538    }
2539  }
2540
2541  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2542
2543    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2544      super(namespaceName);
2545    }
2546
2547    @Override
2548    String getOperationType() {
2549      return "DELETE_NAMESPACE";
2550    }
2551  }
2552
2553  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2554
2555    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2556      super(namespaceName);
2557    }
2558
2559    @Override
2560    String getOperationType() {
2561      return "MODIFY_NAMESPACE";
2562    }
2563  }
2564
2565  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2566
2567    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2568      super(tableName);
2569    }
2570
2571    @Override
2572    String getOperationType() {
2573      return "MERGE_REGIONS";
2574    }
2575  }
2576
2577  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2578
2579    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2580      super(tableName);
2581    }
2582
2583    @Override
2584    String getOperationType() {
2585      return "SPLIT_REGION";
2586    }
2587  }
2588
2589  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2590    private final String peerId;
2591    private final Supplier<String> getOperation;
2592
2593    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2594      this.peerId = peerId;
2595      this.getOperation = getOperation;
2596    }
2597
2598    String getDescription() {
2599      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2600    }
2601
2602    @Override
2603    void onFinished() {
2604      LOG.info(getDescription() + " completed");
2605    }
2606
2607    @Override
2608    void onError(Throwable error) {
2609      LOG.info(getDescription() + " failed with " + error.getMessage());
2610    }
2611  }
2612
2613  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2614    CompletableFuture<Void> future = new CompletableFuture<>();
2615    addListener(procFuture, (procId, error) -> {
2616      if (error != null) {
2617        future.completeExceptionally(error);
2618        return;
2619      }
2620      getProcedureResult(procId, future, 0);
2621    });
2622    return future;
2623  }
2624
2625  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2626    addListener(
2627      this.<GetProcedureResultResponse> newMasterCaller()
2628        .action((controller, stub) -> this
2629          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2630            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2631            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2632        .call(),
2633      (response, error) -> {
2634        if (error != null) {
2635          LOG.warn("failed to get the procedure result procId={}", procId,
2636            ConnectionUtils.translateException(error));
2637          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2638            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2639          return;
2640        }
2641        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2642          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2643            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2644          return;
2645        }
2646        if (response.hasException()) {
2647          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2648          future.completeExceptionally(ioe);
2649        } else {
2650          future.complete(null);
2651        }
2652      });
2653  }
2654
2655  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2656    CompletableFuture<T> future = new CompletableFuture<>();
2657    future.completeExceptionally(error);
2658    return future;
2659  }
2660
2661  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2662    if (error != null) {
2663      future.completeExceptionally(error);
2664      return true;
2665    }
2666    return false;
2667  }
2668
2669  @Override
2670  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2671    return getClusterMetrics(EnumSet.allOf(Option.class));
2672  }
2673
2674  @Override
2675  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2676    return this
2677        .<ClusterMetrics> newMasterCaller()
2678        .action(
2679          (controller, stub) -> this
2680              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2681                stub, RequestConverter.buildGetClusterStatusRequest(options),
2682                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2683                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2684  }
2685
2686  @Override
2687  public CompletableFuture<Void> shutdown() {
2688    return this
2689        .<Void> newMasterCaller()
2690        .action(
2691          (controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2692            stub, ShutdownRequest.newBuilder().build(),
2693            (s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call();
2694  }
2695
2696  @Override
2697  public CompletableFuture<Void> stopMaster() {
2698    return this
2699        .<Void> newMasterCaller()
2700        .action(
2701          (controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(controller,
2702            stub, StopMasterRequest.newBuilder().build(),
2703            (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call();
2704  }
2705
2706  @Override
2707  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2708    StopServerRequest request =
2709        RequestConverter.buildStopServerRequest("Called by admin client "
2710            + this.connection.toString());
2711    return this
2712        .<Void> newAdminCaller()
2713        .action(
2714          (controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2715            controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2716            resp -> null)).serverName(serverName).call();
2717  }
2718
2719  @Override
2720  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2721    return this
2722        .<Void> newAdminCaller()
2723        .action(
2724          (controller, stub) -> this
2725              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2726                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2727                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2728        .serverName(serverName).call();
2729  }
2730
2731  @Override
2732  public CompletableFuture<Void> updateConfiguration() {
2733    CompletableFuture<Void> future = new CompletableFuture<Void>();
2734    addListener(
2735      getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)),
2736      (status, err) -> {
2737        if (err != null) {
2738          future.completeExceptionally(err);
2739        } else {
2740          List<CompletableFuture<Void>> futures = new ArrayList<>();
2741          status.getLiveServerMetrics().keySet()
2742            .forEach(server -> futures.add(updateConfiguration(server)));
2743          futures.add(updateConfiguration(status.getMasterName()));
2744          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2745          addListener(
2746            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2747            (result, err2) -> {
2748              if (err2 != null) {
2749                future.completeExceptionally(err2);
2750              } else {
2751                future.complete(result);
2752              }
2753            });
2754        }
2755      });
2756    return future;
2757  }
2758
2759  @Override
2760  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2761    return this
2762        .<Void> newAdminCaller()
2763        .action(
2764          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2765            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2766            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2767        .serverName(serverName).call();
2768  }
2769
2770  @Override
2771  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2772    return this
2773        .<Void> newAdminCaller()
2774        .action(
2775          (controller, stub) -> this
2776              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2777                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2778                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2779        .serverName(serverName).call();
2780  }
2781
2782  @Override
2783  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2784    return this
2785        .<List<SecurityCapability>> newMasterCaller()
2786        .action(
2787          (controller, stub) -> this
2788              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
2789                controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
2790                    done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
2791                    .toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
2792  }
2793
2794  @Override
2795  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2796    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2797  }
2798
2799  @Override
2800  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2801      TableName tableName) {
2802    Preconditions.checkNotNull(tableName,
2803      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2804    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2805  }
2806
2807  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2808      ServerName serverName) {
2809    return this.<List<RegionMetrics>> newAdminCaller()
2810        .action((controller, stub) -> this
2811            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2812              adminCall(controller, stub, request, (s, c, req, done) ->
2813                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2814        .serverName(serverName).call();
2815  }
2816
2817  @Override
2818  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2819    return this
2820        .<Boolean> newMasterCaller()
2821        .action(
2822          (controller, stub) -> this
2823              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2824                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2825                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2826                resp -> resp.getInMaintenanceMode())).call();
2827  }
2828
2829  @Override
2830  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2831      CompactType compactType) {
2832    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2833
2834    switch (compactType) {
2835      case MOB:
2836        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
2837          if (err != null) {
2838            future.completeExceptionally(err);
2839            return;
2840          }
2841          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2842
2843          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2844            .action((controller, stub) -> this
2845              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2846                controller, stub,
2847                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2848                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2849            .call(), (resp2, err2) -> {
2850              if (err2 != null) {
2851                future.completeExceptionally(err2);
2852              } else {
2853                if (resp2.hasCompactionState()) {
2854                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2855                } else {
2856                  future.complete(CompactionState.NONE);
2857                }
2858              }
2859            });
2860        });
2861        break;
2862      case NORMAL:
2863        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2864          if (err != null) {
2865            future.completeExceptionally(err);
2866            return;
2867          }
2868          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
2869          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2870          locations.stream().filter(loc -> loc.getServerName() != null)
2871            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2872            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2873              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2874                // If any region compaction state is MAJOR_AND_MINOR
2875                // the table compaction state is MAJOR_AND_MINOR, too.
2876                if (err2 != null) {
2877                  future.completeExceptionally(unwrapCompletionException(err2));
2878                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
2879                  future.complete(regionState);
2880                } else {
2881                  regionStates.add(regionState);
2882                }
2883              }));
2884            });
2885          addListener(
2886            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2887            (ret, err3) -> {
2888              // If future not completed, check all regions's compaction state
2889              if (!future.isCompletedExceptionally() && !future.isDone()) {
2890                CompactionState state = CompactionState.NONE;
2891                for (CompactionState regionState : regionStates) {
2892                  switch (regionState) {
2893                    case MAJOR:
2894                      if (state == CompactionState.MINOR) {
2895                        future.complete(CompactionState.MAJOR_AND_MINOR);
2896                      } else {
2897                        state = CompactionState.MAJOR;
2898                      }
2899                      break;
2900                    case MINOR:
2901                      if (state == CompactionState.MAJOR) {
2902                        future.complete(CompactionState.MAJOR_AND_MINOR);
2903                      } else {
2904                        state = CompactionState.MINOR;
2905                      }
2906                      break;
2907                    case NONE:
2908                    default:
2909                  }
2910                }
2911                if (!future.isDone()) {
2912                  future.complete(state);
2913                }
2914              }
2915            });
2916        });
2917        break;
2918      default:
2919        throw new IllegalArgumentException("Unknown compactType: " + compactType);
2920    }
2921
2922    return future;
2923  }
2924
2925  @Override
2926  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
2927    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2928    addListener(getRegionLocation(regionName), (location, err) -> {
2929      if (err != null) {
2930        future.completeExceptionally(err);
2931        return;
2932      }
2933      ServerName serverName = location.getServerName();
2934      if (serverName == null) {
2935        future
2936          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
2937        return;
2938      }
2939      addListener(
2940        this.<GetRegionInfoResponse> newAdminCaller()
2941          .action((controller, stub) -> this
2942            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2943              controller, stub,
2944              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
2945                true),
2946              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2947          .serverName(serverName).call(),
2948        (resp2, err2) -> {
2949          if (err2 != null) {
2950            future.completeExceptionally(err2);
2951          } else {
2952            if (resp2.hasCompactionState()) {
2953              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2954            } else {
2955              future.complete(CompactionState.NONE);
2956            }
2957          }
2958        });
2959    });
2960    return future;
2961  }
2962
2963  @Override
2964  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
2965    MajorCompactionTimestampRequest request =
2966        MajorCompactionTimestampRequest.newBuilder()
2967            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
2968    return this
2969        .<Optional<Long>> newMasterCaller()
2970        .action(
2971          (controller, stub) -> this
2972              .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
2973                controller, stub, request,
2974                (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
2975                ProtobufUtil::toOptionalTimestamp)).call();
2976  }
2977
2978  @Override
2979  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
2980      byte[] regionName) {
2981    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
2982    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
2983    addListener(getRegionInfo(regionName), (region, err) -> {
2984      if (err != null) {
2985        future.completeExceptionally(err);
2986        return;
2987      }
2988      MajorCompactionTimestampForRegionRequest.Builder builder =
2989        MajorCompactionTimestampForRegionRequest.newBuilder();
2990      builder.setRegion(
2991        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
2992      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
2993        .<MajorCompactionTimestampForRegionRequest,
2994        MajorCompactionTimestampResponse, Optional<Long>> call(
2995          controller, stub, builder.build(),
2996          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
2997          ProtobufUtil::toOptionalTimestamp))
2998        .call(), (timestamp, err2) -> {
2999          if (err2 != null) {
3000            future.completeExceptionally(err2);
3001          } else {
3002            future.complete(timestamp);
3003          }
3004        });
3005    });
3006    return future;
3007  }
3008
3009  @Override
3010  public CompletableFuture<Boolean> balancerSwitch(final boolean on) {
3011    return this
3012        .<Boolean> newMasterCaller()
3013        .action(
3014          (controller, stub) -> this
3015              .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
3016                stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
3017                (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3018                (resp) -> resp.getPrevBalanceValue())).call();
3019  }
3020
3021  @Override
3022  public CompletableFuture<Boolean> balance(boolean forcible) {
3023    return this
3024        .<Boolean> newMasterCaller()
3025        .action(
3026          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3027            stub, RequestConverter.buildBalanceRequest(forcible),
3028            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3029  }
3030
3031  @Override
3032  public CompletableFuture<Boolean> isBalancerEnabled() {
3033    return this
3034        .<Boolean> newMasterCaller()
3035        .action(
3036          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
3037            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3038            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3039        .call();
3040  }
3041
3042  @Override
3043  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3044    return this
3045        .<Boolean> newMasterCaller()
3046        .action(
3047          (controller, stub) -> this
3048              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3049                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3050                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3051                    .getPrevNormalizerValue())).call();
3052  }
3053
3054  @Override
3055  public CompletableFuture<Boolean> isNormalizerEnabled() {
3056    return this
3057        .<Boolean> newMasterCaller()
3058        .action(
3059          (controller, stub) -> this
3060              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3061                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3062                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3063                (resp) -> resp.getEnabled())).call();
3064  }
3065
3066  @Override
3067  public CompletableFuture<Boolean> normalize() {
3068    return this
3069        .<Boolean> newMasterCaller()
3070        .action(
3071          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3072            controller, stub, RequestConverter.buildNormalizeRequest(),
3073            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3074        .call();
3075  }
3076
3077  @Override
3078  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3079    return this
3080        .<Boolean> newMasterCaller()
3081        .action(
3082          (controller, stub) -> this
3083              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3084                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3085                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3086                    .getPrevValue())).call();
3087  }
3088
3089  @Override
3090  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3091    return this
3092        .<Boolean> newMasterCaller()
3093        .action(
3094          (controller, stub) -> this
3095              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3096                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3097                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3098        .call();
3099  }
3100
3101  @Override
3102  public CompletableFuture<Boolean> runCleanerChore() {
3103    return this
3104        .<Boolean> newMasterCaller()
3105        .action(
3106          (controller, stub) -> this
3107              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3108                RequestConverter.buildRunCleanerChoreRequest(),
3109                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3110                (resp) -> resp.getCleanerChoreRan())).call();
3111  }
3112
3113  @Override
3114  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3115    return this
3116        .<Boolean> newMasterCaller()
3117        .action(
3118          (controller, stub) -> this
3119              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3120                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3121                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3122                    .getPrevValue())).call();
3123  }
3124
3125  @Override
3126  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3127    return this
3128        .<Boolean> newMasterCaller()
3129        .action(
3130          (controller, stub) -> this
3131              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3132                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3133                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3134                    .getValue())).call();
3135  }
3136
3137  @Override
3138  public CompletableFuture<Integer> runCatalogJanitor() {
3139    return this
3140        .<Integer> newMasterCaller()
3141        .action(
3142          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3143            controller, stub, RequestConverter.buildCatalogScanRequest(),
3144            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3145        .call();
3146  }
3147
3148  @Override
3149  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3150      ServiceCaller<S, R> callable) {
3151    MasterCoprocessorRpcChannelImpl channel =
3152        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3153    S stub = stubMaker.apply(channel);
3154    CompletableFuture<R> future = new CompletableFuture<>();
3155    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3156    callable.call(stub, controller, resp -> {
3157      if (controller.failed()) {
3158        future.completeExceptionally(controller.getFailed());
3159      } else {
3160        future.complete(resp);
3161      }
3162    });
3163    return future;
3164  }
3165
3166  @Override
3167  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3168      ServiceCaller<S, R> callable, ServerName serverName) {
3169    RegionServerCoprocessorRpcChannelImpl channel =
3170        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3171          serverName));
3172    S stub = stubMaker.apply(channel);
3173    CompletableFuture<R> future = new CompletableFuture<>();
3174    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3175    callable.call(stub, controller, resp -> {
3176      if (controller.failed()) {
3177        future.completeExceptionally(controller.getFailed());
3178      } else {
3179        future.complete(resp);
3180      }
3181    });
3182    return future;
3183  }
3184
3185  @Override
3186  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3187    return this.<List<ServerName>> newMasterCaller()
3188      .action((controller, stub) -> this
3189        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3190          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3191          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3192          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3193      .call();
3194  }
3195
3196  private <T> ServerRequestCallerBuilder<T> newServerCaller() {
3197    return this.connection.callerFactory.<T> serverRequest()
3198        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3199        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3200        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
3201        .startLogErrorsCnt(startLogErrorsCnt);
3202  }
3203
3204  @Override
3205  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3206    if (tableName == null) {
3207      return failedFuture(new IllegalArgumentException("Table name is null"));
3208    }
3209    CompletableFuture<Void> future = new CompletableFuture<>();
3210    addListener(tableExists(tableName), (exist, err) -> {
3211      if (err != null) {
3212        future.completeExceptionally(err);
3213        return;
3214      }
3215      if (!exist) {
3216        future.completeExceptionally(new TableNotFoundException(
3217          "Table '" + tableName.getNameAsString() + "' does not exists."));
3218        return;
3219      }
3220      addListener(getTableSplits(tableName), (splits, err1) -> {
3221        if (err1 != null) {
3222          future.completeExceptionally(err1);
3223        } else {
3224          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3225            if (err2 != null) {
3226              future.completeExceptionally(err2);
3227            } else {
3228              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3229                if (err3 != null) {
3230                  future.completeExceptionally(err3);
3231                } else {
3232                  future.complete(result3);
3233                }
3234              });
3235            }
3236          });
3237        }
3238      });
3239    });
3240    return future;
3241  }
3242
3243  @Override
3244  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3245    if (tableName == null) {
3246      return failedFuture(new IllegalArgumentException("Table name is null"));
3247    }
3248    CompletableFuture<Void> future = new CompletableFuture<>();
3249    addListener(tableExists(tableName), (exist, err) -> {
3250      if (err != null) {
3251        future.completeExceptionally(err);
3252        return;
3253      }
3254      if (!exist) {
3255        future.completeExceptionally(new TableNotFoundException(
3256          "Table '" + tableName.getNameAsString() + "' does not exists."));
3257        return;
3258      }
3259      addListener(setTableReplication(tableName, false), (result, err2) -> {
3260        if (err2 != null) {
3261          future.completeExceptionally(err2);
3262        } else {
3263          future.complete(result);
3264        }
3265      });
3266    });
3267    return future;
3268  }
3269
3270  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3271    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3272    addListener(getRegions(tableName), (regions, err2) -> {
3273      if (err2 != null) {
3274        future.completeExceptionally(err2);
3275        return;
3276      }
3277      if (regions.size() == 1) {
3278        future.complete(null);
3279      } else {
3280        byte[][] splits = new byte[regions.size() - 1][];
3281        for (int i = 1; i < regions.size(); i++) {
3282          splits[i - 1] = regions.get(i).getStartKey();
3283        }
3284        future.complete(splits);
3285      }
3286    });
3287    return future;
3288  }
3289
3290  /**
3291   * Connect to peer and check the table descriptor on peer:
3292   * <ol>
3293   * <li>Create the same table on peer when not exist.</li>
3294   * <li>Throw an exception if the table already has replication enabled on any of the column
3295   * families.</li>
3296   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3297   * </ol>
3298   * @param tableName name of the table to sync to the peer
3299   * @param splits table split keys
3300   */
3301  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3302      byte[][] splits) {
3303    CompletableFuture<Void> future = new CompletableFuture<>();
3304    addListener(listReplicationPeers(), (peers, err) -> {
3305      if (err != null) {
3306        future.completeExceptionally(err);
3307        return;
3308      }
3309      if (peers == null || peers.size() <= 0) {
3310        future.completeExceptionally(
3311          new IllegalArgumentException("Found no peer cluster for replication."));
3312        return;
3313      }
3314      List<CompletableFuture<Void>> futures = new ArrayList<>();
3315      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3316        .forEach(peer -> {
3317          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3318        });
3319      addListener(
3320        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3321        (result, err2) -> {
3322          if (err2 != null) {
3323            future.completeExceptionally(err2);
3324          } else {
3325            future.complete(result);
3326          }
3327        });
3328    });
3329    return future;
3330  }
3331
3332  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3333      ReplicationPeerDescription peer) {
3334    Configuration peerConf = null;
3335    try {
3336      peerConf =
3337        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3338    } catch (IOException e) {
3339      return failedFuture(e);
3340    }
3341    CompletableFuture<Void> future = new CompletableFuture<>();
3342    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3343      if (err != null) {
3344        future.completeExceptionally(err);
3345        return;
3346      }
3347      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3348        if (err1 != null) {
3349          future.completeExceptionally(err1);
3350          return;
3351        }
3352        AsyncAdmin peerAdmin = conn.getAdmin();
3353        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3354          if (err2 != null) {
3355            future.completeExceptionally(err2);
3356            return;
3357          }
3358          if (!exist) {
3359            CompletableFuture<Void> createTableFuture = null;
3360            if (splits == null) {
3361              createTableFuture = peerAdmin.createTable(tableDesc);
3362            } else {
3363              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3364            }
3365            addListener(createTableFuture, (result, err3) -> {
3366              if (err3 != null) {
3367                future.completeExceptionally(err3);
3368              } else {
3369                future.complete(result);
3370              }
3371            });
3372          } else {
3373            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3374              (result, err4) -> {
3375                if (err4 != null) {
3376                  future.completeExceptionally(err4);
3377                } else {
3378                  future.complete(result);
3379                }
3380              });
3381          }
3382        });
3383      });
3384    });
3385    return future;
3386  }
3387
3388  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3389      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3390    CompletableFuture<Void> future = new CompletableFuture<>();
3391    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3392      if (err != null) {
3393        future.completeExceptionally(err);
3394        return;
3395      }
3396      if (peerTableDesc == null) {
3397        future.completeExceptionally(
3398          new IllegalArgumentException("Failed to get table descriptor for table " +
3399            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3400        return;
3401      }
3402      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3403        future.completeExceptionally(new IllegalArgumentException(
3404          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3405            ", but the table descriptors are not same when compared with source cluster." +
3406            " Thus can not enable the table's replication switch."));
3407        return;
3408      }
3409      future.complete(null);
3410    });
3411    return future;
3412  }
3413
3414  /**
3415   * Set the table's replication switch if the table's replication switch is already not set.
3416   * @param tableName name of the table
3417   * @param enableRep is replication switch enable or disable
3418   */
3419  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3420    CompletableFuture<Void> future = new CompletableFuture<>();
3421    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3422      if (err != null) {
3423        future.completeExceptionally(err);
3424        return;
3425      }
3426      if (!tableDesc.matchReplicationScope(enableRep)) {
3427        int scope =
3428          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3429        TableDescriptor newTableDesc =
3430          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3431        addListener(modifyTable(newTableDesc), (result, err2) -> {
3432          if (err2 != null) {
3433            future.completeExceptionally(err2);
3434          } else {
3435            future.complete(result);
3436          }
3437        });
3438      } else {
3439        future.complete(null);
3440      }
3441    });
3442    return future;
3443  }
3444
3445  @Override
3446  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3447    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3448    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3449      if (err != null) {
3450        future.completeExceptionally(err);
3451        return;
3452      }
3453      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3454        locations.stream().filter(l -> l.getRegion() != null)
3455          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3456          .collect(Collectors.groupingBy(l -> l.getServerName(),
3457            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3458      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3459      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3460      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3461        futures
3462          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3463            if (err2 != null) {
3464              future.completeExceptionally(unwrapCompletionException(err2));
3465            } else {
3466              aggregator.append(stats);
3467            }
3468          }));
3469      }
3470      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3471        (ret, err3) -> {
3472          if (err3 != null) {
3473            future.completeExceptionally(unwrapCompletionException(err3));
3474          } else {
3475            future.complete(aggregator.sum());
3476          }
3477        });
3478    });
3479    return future;
3480  }
3481
3482  @Override
3483  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3484      boolean preserveSplits) {
3485    CompletableFuture<Void> future = new CompletableFuture<>();
3486    addListener(tableExists(tableName), (exist, err) -> {
3487      if (err != null) {
3488        future.completeExceptionally(err);
3489        return;
3490      }
3491      if (!exist) {
3492        future.completeExceptionally(new TableNotFoundException(tableName));
3493        return;
3494      }
3495      addListener(tableExists(newTableName), (exist1, err1) -> {
3496        if (err1 != null) {
3497          future.completeExceptionally(err1);
3498          return;
3499        }
3500        if (exist1) {
3501          future.completeExceptionally(new TableExistsException(newTableName));
3502          return;
3503        }
3504        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3505          if (err2 != null) {
3506            future.completeExceptionally(err2);
3507            return;
3508          }
3509          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3510          if (preserveSplits) {
3511            addListener(getTableSplits(tableName), (splits, err3) -> {
3512              if (err3 != null) {
3513                future.completeExceptionally(err3);
3514              } else {
3515                addListener(createTable(newTableDesc, splits), (result, err4) -> {
3516                  if (err4 != null) {
3517                    future.completeExceptionally(err4);
3518                  } else {
3519                    future.complete(result);
3520                  }
3521                });
3522              }
3523            });
3524          } else {
3525            addListener(createTable(newTableDesc), (result, err5) -> {
3526              if (err5 != null) {
3527                future.completeExceptionally(err5);
3528              } else {
3529                future.complete(result);
3530              }
3531            });
3532          }
3533        });
3534      });
3535    });
3536    return future;
3537  }
3538
3539  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3540      List<RegionInfo> hris) {
3541    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3542      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3543        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3544        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3545        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3546      .serverName(serverName).call();
3547  }
3548}