001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
023
024import com.google.protobuf.Message;
025import com.google.protobuf.RpcChannel;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicReference;
039import java.util.function.BiConsumer;
040import java.util.function.Function;
041import java.util.regex.Pattern;
042import java.util.stream.Collectors;
043import java.util.stream.Stream;
044import org.apache.commons.io.IOUtils;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
047import org.apache.hadoop.hbase.CacheEvictionStats;
048import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
049import org.apache.hadoop.hbase.ClusterMetrics;
050import org.apache.hadoop.hbase.ClusterMetrics.Option;
051import org.apache.hadoop.hbase.ClusterMetricsBuilder;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HRegionLocation;
054import org.apache.hadoop.hbase.MetaTableAccessor;
055import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
056import org.apache.hadoop.hbase.NamespaceDescriptor;
057import org.apache.hadoop.hbase.RegionLocations;
058import org.apache.hadoop.hbase.RegionMetrics;
059import org.apache.hadoop.hbase.RegionMetricsBuilder;
060import org.apache.hadoop.hbase.ServerName;
061import org.apache.hadoop.hbase.TableExistsException;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.TableNotDisabledException;
064import org.apache.hadoop.hbase.TableNotEnabledException;
065import org.apache.hadoop.hbase.TableNotFoundException;
066import org.apache.hadoop.hbase.UnknownRegionException;
067import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
068import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
070import org.apache.hadoop.hbase.client.Scan.ReadType;
071import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
072import org.apache.hadoop.hbase.client.replication.TableCFs;
073import org.apache.hadoop.hbase.client.security.SecurityCapability;
074import org.apache.hadoop.hbase.exceptions.DeserializationException;
075import org.apache.hadoop.hbase.ipc.HBaseRpcController;
076import org.apache.hadoop.hbase.quotas.QuotaFilter;
077import org.apache.hadoop.hbase.quotas.QuotaSettings;
078import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
079import org.apache.hadoop.hbase.replication.ReplicationException;
080import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
081import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
082import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
083import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
084import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
085import org.apache.hadoop.hbase.util.Bytes;
086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
087import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
088import org.apache.yetus.audience.InterfaceAudience;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
093import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
094import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
095import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
096import org.apache.hbase.thirdparty.io.netty.util.Timeout;
097import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
098
099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
100import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
262
263/**
264 * The implementation of AsyncAdmin.
265 * <p>
266 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
267 * be finished inside the rpc framework thread, which means that the callbacks registered to the
268 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
269 * this class should not try to do time consuming tasks in the callbacks.
270 * @since 2.0.0
271 * @see AsyncHBaseAdmin
272 * @see AsyncConnection#getAdmin()
273 * @see AsyncConnection#getAdminBuilder()
274 */
275@InterfaceAudience.Private
276class RawAsyncHBaseAdmin implements AsyncAdmin {
277  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
278
279  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
280
281  private final AsyncConnectionImpl connection;
282
283  private final HashedWheelTimer retryTimer;
284
285  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
286
287  private final long rpcTimeoutNs;
288
289  private final long operationTimeoutNs;
290
291  private final long pauseNs;
292
293  private final int maxAttempts;
294
295  private final int startLogErrorsCnt;
296
297  private final NonceGenerator ng;
298
299  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
300      AsyncAdminBuilderBase builder) {
301    this.connection = connection;
302    this.retryTimer = retryTimer;
303    this.metaTable = connection.getTable(META_TABLE_NAME);
304    this.rpcTimeoutNs = builder.rpcTimeoutNs;
305    this.operationTimeoutNs = builder.operationTimeoutNs;
306    this.pauseNs = builder.pauseNs;
307    this.maxAttempts = builder.maxAttempts;
308    this.startLogErrorsCnt = builder.startLogErrorsCnt;
309    this.ng = connection.getNonceGenerator();
310  }
311
312  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
313    return this.connection.callerFactory.<T> masterRequest()
314        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
315        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
316        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
317        .startLogErrorsCnt(startLogErrorsCnt);
318  }
319
320  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
321    return this.connection.callerFactory.<T> adminRequest()
322        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
323        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
324        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
325        .startLogErrorsCnt(startLogErrorsCnt);
326  }
327
328  @FunctionalInterface
329  private interface MasterRpcCall<RESP, REQ> {
330    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
331        RpcCallback<RESP> done);
332  }
333
334  @FunctionalInterface
335  private interface AdminRpcCall<RESP, REQ> {
336    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
337        RpcCallback<RESP> done);
338  }
339
340  @FunctionalInterface
341  private interface Converter<D, S> {
342    D convert(S src) throws IOException;
343  }
344
345  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
346      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
347      Converter<RESP, PRESP> respConverter) {
348    CompletableFuture<RESP> future = new CompletableFuture<>();
349    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
350
351      @Override
352      public void run(PRESP resp) {
353        if (controller.failed()) {
354          future.completeExceptionally(controller.getFailed());
355        } else {
356          try {
357            future.complete(respConverter.convert(resp));
358          } catch (IOException e) {
359            future.completeExceptionally(e);
360          }
361        }
362      }
363    });
364    return future;
365  }
366
367  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
368      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
369      Converter<RESP, PRESP> respConverter) {
370
371    CompletableFuture<RESP> future = new CompletableFuture<>();
372    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
373
374      @Override
375      public void run(PRESP resp) {
376        if (controller.failed()) {
377          future.completeExceptionally(new IOException(controller.errorText()));
378        } else {
379          try {
380            future.complete(respConverter.convert(resp));
381          } catch (IOException e) {
382            future.completeExceptionally(e);
383          }
384        }
385      }
386    });
387    return future;
388  }
389
390  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
391      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
392      ProcedureBiConsumer consumer) {
393    CompletableFuture<Long> procFuture =
394      this.<Long> newMasterCaller().action((controller, stub) -> this
395        .<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call();
396    CompletableFuture<Void> future = waitProcedureResult(procFuture);
397    addListener(future, consumer);
398    return future;
399  }
400
401  @FunctionalInterface
402  private interface TableOperator {
403    CompletableFuture<Void> operate(TableName table);
404  }
405
406  @Override
407  public CompletableFuture<Boolean> tableExists(TableName tableName) {
408    if (TableName.isMetaTableName(tableName)) {
409      return CompletableFuture.completedFuture(true);
410    }
411    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
412  }
413
414  @Override
415  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
416    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
417      includeSysTables));
418  }
419
420  /**
421   * {@link #listTables(boolean)}
422   */
423  @Override
424  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
425      boolean includeSysTables) {
426    Preconditions.checkNotNull(pattern,
427      "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
428    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
429      includeSysTables));
430  }
431
432  private CompletableFuture<List<TableDescriptor>>
433      getTableDescriptors(GetTableDescriptorsRequest request) {
434    return this.<List<TableDescriptor>> newMasterCaller()
435        .action((controller, stub) -> this
436            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
437              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
438              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
439        .call();
440  }
441
442  @Override
443  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
444    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
445  }
446
447  @Override
448  public CompletableFuture<List<TableName>>
449      listTableNames(Pattern pattern, boolean includeSysTables) {
450    Preconditions.checkNotNull(pattern,
451        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
452    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
453  }
454
455  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
456    return this
457        .<List<TableName>> newMasterCaller()
458        .action(
459          (controller, stub) -> this
460              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
461                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
462                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
463  }
464
465  @Override
466  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
467    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
468        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
469        List<TableDescriptor>> call(
470          controller, stub,
471          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
472          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
473          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
474        .call();
475  }
476
477  @Override
478  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
479    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
480        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
481        List<TableName>> call(
482          controller, stub,
483          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
484          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
485          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
486        .call();
487  }
488
489  @Override
490  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
491    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
492    addListener(this.<List<TableSchema>> newMasterCaller()
493      .action((controller, stub) -> this
494        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
495          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
496          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
497          (resp) -> resp.getTableSchemaList()))
498      .call(), (tableSchemas, error) -> {
499        if (error != null) {
500          future.completeExceptionally(error);
501          return;
502        }
503        if (!tableSchemas.isEmpty()) {
504          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
505        } else {
506          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
507        }
508      });
509    return future;
510  }
511
512  @Override
513  public CompletableFuture<Void> createTable(TableDescriptor desc) {
514    return createTable(desc.getTableName(),
515      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
516  }
517
518  @Override
519  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
520      int numRegions) {
521    try {
522      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
523    } catch (IllegalArgumentException e) {
524      return failedFuture(e);
525    }
526  }
527
528  @Override
529  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
530    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
531        + " use createTable(TableDescriptor) instead");
532    try {
533      verifySplitKeys(splitKeys);
534      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
535        splitKeys, ng.getNonceGroup(), ng.newNonce()));
536    } catch (IllegalArgumentException e) {
537      return failedFuture(e);
538    }
539  }
540
541  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
542    Preconditions.checkNotNull(tableName, "table name is null");
543    return this.<CreateTableRequest, CreateTableResponse> procedureCall(request,
544      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
545      new CreateTableProcedureBiConsumer(tableName));
546  }
547
548  @Override
549  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
550    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(
551      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
552        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
553      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
554  }
555
556  @Override
557  public CompletableFuture<Void> deleteTable(TableName tableName) {
558    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
559        .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
560      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
561      new DeleteTableProcedureBiConsumer(tableName));
562  }
563
564  @Override
565  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
566    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
567      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
568        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
569      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
570  }
571
572  @Override
573  public CompletableFuture<Void> enableTable(TableName tableName) {
574    return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
575        .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
576      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
577      new EnableTableProcedureBiConsumer(tableName));
578  }
579
580  @Override
581  public CompletableFuture<Void> disableTable(TableName tableName) {
582    return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
583        .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
584      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
585      new DisableTableProcedureBiConsumer(tableName));
586  }
587
588  @Override
589  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
590    if (TableName.isMetaTableName(tableName)) {
591      return CompletableFuture.completedFuture(true);
592    }
593    CompletableFuture<Boolean> future = new CompletableFuture<>();
594    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
595      if (error != null) {
596        future.completeExceptionally(error);
597        return;
598      }
599      if (state.isPresent()) {
600        future.complete(state.get().inStates(TableState.State.ENABLED));
601      } else {
602        future.completeExceptionally(new TableNotFoundException(tableName));
603      }
604    });
605    return future;
606  }
607
608  @Override
609  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
610    if (TableName.isMetaTableName(tableName)) {
611      return CompletableFuture.completedFuture(false);
612    }
613    CompletableFuture<Boolean> future = new CompletableFuture<>();
614    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
615      if (error != null) {
616        future.completeExceptionally(error);
617        return;
618      }
619      if (state.isPresent()) {
620        future.complete(state.get().inStates(TableState.State.DISABLED));
621      } else {
622        future.completeExceptionally(new TableNotFoundException(tableName));
623      }
624    });
625    return future;
626  }
627
628  @Override
629  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
630    return isTableAvailable(tableName, Optional.empty());
631  }
632
633  @Override
634  public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
635    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
636        + " use isTableAvailable(TableName) instead");
637    return isTableAvailable(tableName, Optional.of(splitKeys));
638  }
639
640  private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
641      Optional<byte[][]> splitKeys) {
642    if (TableName.isMetaTableName(tableName)) {
643      return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream
644        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
645    }
646    CompletableFuture<Boolean> future = new CompletableFuture<>();
647    addListener(isTableEnabled(tableName), (enabled, error) -> {
648      if (error != null) {
649        future.completeExceptionally(error);
650        return;
651      }
652      if (!enabled) {
653        future.complete(false);
654      } else {
655        addListener(
656          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
657          (locations, error1) -> {
658            if (error1 != null) {
659              future.completeExceptionally(error1);
660              return;
661            }
662            List<HRegionLocation> notDeployedRegions = locations.stream()
663              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
664            if (notDeployedRegions.size() > 0) {
665              if (LOG.isDebugEnabled()) {
666                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
667              }
668              future.complete(false);
669              return;
670            }
671
672            Optional<Boolean> available =
673              splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
674            future.complete(available.orElse(true));
675          });
676      }
677    });
678    return future;
679  }
680
681  private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) {
682    int regionCount = 0;
683    for (HRegionLocation location : locations) {
684      RegionInfo info = location.getRegion();
685      if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
686        regionCount++;
687        continue;
688      }
689      for (byte[] splitKey : splitKeys) {
690        // Just check if the splitkey is available
691        if (Bytes.equals(info.getStartKey(), splitKey)) {
692          regionCount++;
693          break;
694        }
695      }
696    }
697    return regionCount == splitKeys.length + 1;
698  }
699
700  @Override
701  public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
702    return this.<AddColumnRequest, AddColumnResponse> procedureCall(
703      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
704        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
705      new AddColumnFamilyProcedureBiConsumer(tableName));
706  }
707
708  @Override
709  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
710    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
711      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
712        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
713      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
714  }
715
716  @Override
717  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
718      ColumnFamilyDescriptor columnFamily) {
719    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
720      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
721        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
722      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
723  }
724
725  @Override
726  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
727    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
728      RequestConverter.buildCreateNamespaceRequest(descriptor),
729      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
730      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
731  }
732
733  @Override
734  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
735    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
736      RequestConverter.buildModifyNamespaceRequest(descriptor),
737      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
738      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
739  }
740
741  @Override
742  public CompletableFuture<Void> deleteNamespace(String name) {
743    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
744      RequestConverter.buildDeleteNamespaceRequest(name),
745      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
746      new DeleteNamespaceProcedureBiConsumer(name));
747  }
748
749  @Override
750  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
751    return this
752        .<NamespaceDescriptor> newMasterCaller()
753        .action(
754          (controller, stub) -> this
755              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
756                controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
757                    req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
758                    .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
759  }
760
761  @Override
762  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
763    return this
764        .<List<NamespaceDescriptor>> newMasterCaller()
765        .action(
766          (controller, stub) -> this
767              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
768                controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
769                    done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
770                    .toNamespaceDescriptorList(resp))).call();
771  }
772
773  @Override
774  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
775    return this.<List<RegionInfo>> newAdminCaller()
776        .action((controller, stub) -> this
777            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
778              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
779              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
780              resp -> ProtobufUtil.getRegionInfos(resp)))
781        .serverName(serverName).call();
782  }
783
784  @Override
785  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
786    if (tableName.equals(META_TABLE_NAME)) {
787      return connection.getLocator().getRegionLocation(tableName, null, null, operationTimeoutNs)
788          .thenApply(loc -> Collections.singletonList(loc.getRegion()));
789    } else {
790      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
791          .thenApply(
792            locs -> locs.stream().map(loc -> loc.getRegion()).collect(Collectors.toList()));
793    }
794  }
795
796  @Override
797  public CompletableFuture<Void> flush(TableName tableName) {
798    CompletableFuture<Void> future = new CompletableFuture<>();
799    addListener(tableExists(tableName), (exists, err) -> {
800      if (err != null) {
801        future.completeExceptionally(err);
802      } else if (!exists) {
803        future.completeExceptionally(new TableNotFoundException(tableName));
804      } else {
805        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
806          if (err2 != null) {
807            future.completeExceptionally(err2);
808          } else if (!tableEnabled) {
809            future.completeExceptionally(new TableNotEnabledException(tableName));
810          } else {
811            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
812              new HashMap<>()), (ret, err3) -> {
813                if (err3 != null) {
814                  future.completeExceptionally(err3);
815                } else {
816                  future.complete(ret);
817                }
818              });
819          }
820        });
821      }
822    });
823    return future;
824  }
825
826  @Override
827  public CompletableFuture<Void> flushRegion(byte[] regionName) {
828    CompletableFuture<Void> future = new CompletableFuture<>();
829    addListener(getRegionLocation(regionName), (location, err) -> {
830      if (err != null) {
831        future.completeExceptionally(err);
832        return;
833      }
834      ServerName serverName = location.getServerName();
835      if (serverName == null) {
836        future
837          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
838        return;
839      }
840      addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
841        if (err2 != null) {
842          future.completeExceptionally(err2);
843        } else {
844          future.complete(ret);
845        }
846      });
847    });
848    return future;
849  }
850
851  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
852    return this.<Void> newAdminCaller()
853            .serverName(serverName)
854            .action(
855              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
856                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
857                  .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
858                resp -> null))
859            .call();
860  }
861
862  @Override
863  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
864    CompletableFuture<Void> future = new CompletableFuture<>();
865    addListener(getRegions(sn), (hRegionInfos, err) -> {
866      if (err != null) {
867        future.completeExceptionally(err);
868        return;
869      }
870      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
871      if (hRegionInfos != null) {
872        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
873      }
874      addListener(CompletableFuture.allOf(
875        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
876          if (err2 != null) {
877            future.completeExceptionally(err2);
878          } else {
879            future.complete(ret);
880          }
881        });
882    });
883    return future;
884  }
885
886  @Override
887  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
888    return compact(tableName, null, false, compactType);
889  }
890
891  @Override
892  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
893      CompactType compactType) {
894    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
895        + "If you don't specify a columnFamily, use compact(TableName) instead");
896    return compact(tableName, columnFamily, false, compactType);
897  }
898
899  @Override
900  public CompletableFuture<Void> compactRegion(byte[] regionName) {
901    return compactRegion(regionName, null, false);
902  }
903
904  @Override
905  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
906    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
907        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
908    return compactRegion(regionName, columnFamily, false);
909  }
910
911  @Override
912  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
913    return compact(tableName, null, true, compactType);
914  }
915
916  @Override
917  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
918      CompactType compactType) {
919    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
920        + "If you don't specify a columnFamily, use compact(TableName) instead");
921    return compact(tableName, columnFamily, true, compactType);
922  }
923
924  @Override
925  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
926    return compactRegion(regionName, null, true);
927  }
928
929  @Override
930  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
931    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
932        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
933    return compactRegion(regionName, columnFamily, true);
934  }
935
936  @Override
937  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
938    return compactRegionServer(sn, false);
939  }
940
941  @Override
942  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
943    return compactRegionServer(sn, true);
944  }
945
946  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
947    CompletableFuture<Void> future = new CompletableFuture<>();
948    addListener(getRegions(sn), (hRegionInfos, err) -> {
949      if (err != null) {
950        future.completeExceptionally(err);
951        return;
952      }
953      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
954      if (hRegionInfos != null) {
955        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
956      }
957      addListener(CompletableFuture.allOf(
958        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
959          if (err2 != null) {
960            future.completeExceptionally(err2);
961          } else {
962            future.complete(ret);
963          }
964        });
965    });
966    return future;
967  }
968
969  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
970      boolean major) {
971    CompletableFuture<Void> future = new CompletableFuture<>();
972    addListener(getRegionLocation(regionName), (location, err) -> {
973      if (err != null) {
974        future.completeExceptionally(err);
975        return;
976      }
977      ServerName serverName = location.getServerName();
978      if (serverName == null) {
979        future
980          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
981        return;
982      }
983      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
984        (ret, err2) -> {
985          if (err2 != null) {
986            future.completeExceptionally(err2);
987          } else {
988            future.complete(ret);
989          }
990        });
991    });
992    return future;
993  }
994
995  /**
996   * List all region locations for the specific table.
997   */
998  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
999    if (TableName.META_TABLE_NAME.equals(tableName)) {
1000      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1001      // For meta table, we use zk to fetch all locations.
1002      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
1003      addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
1004        if (err != null) {
1005          future.completeExceptionally(err);
1006        } else if (metaRegions == null || metaRegions.isEmpty() ||
1007          metaRegions.getDefaultRegionLocation() == null) {
1008          future.completeExceptionally(new IOException("meta region does not found"));
1009        } else {
1010          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1011        }
1012        // close the registry.
1013        IOUtils.closeQuietly(registry);
1014      });
1015      return future;
1016    } else {
1017      // For non-meta table, we fetch all locations by scanning hbase:meta table
1018      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
1019    }
1020  }
1021
1022  /**
1023   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1024   */
1025  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1026      CompactType compactType) {
1027    CompletableFuture<Void> future = new CompletableFuture<>();
1028
1029    switch (compactType) {
1030      case MOB:
1031        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
1032          if (err != null) {
1033            future.completeExceptionally(err);
1034            return;
1035          }
1036          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1037          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1038            if (err2 != null) {
1039              future.completeExceptionally(err2);
1040            } else {
1041              future.complete(ret);
1042            }
1043          });
1044        });
1045        break;
1046      case NORMAL:
1047        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1048          if (err != null) {
1049            future.completeExceptionally(err);
1050            return;
1051          }
1052          CompletableFuture<?>[] compactFutures =
1053            locations.stream().filter(l -> l.getRegion() != null)
1054              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1055              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1056              .toArray(CompletableFuture<?>[]::new);
1057          // future complete unless all of the compact futures are completed.
1058          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1059            if (err2 != null) {
1060              future.completeExceptionally(err2);
1061            } else {
1062              future.complete(ret);
1063            }
1064          });
1065        });
1066        break;
1067      default:
1068        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1069    }
1070    return future;
1071  }
1072
1073  /**
1074   * Compact the region at specific region server.
1075   */
1076  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1077      final boolean major, byte[] columnFamily) {
1078    return this
1079        .<Void> newAdminCaller()
1080        .serverName(sn)
1081        .action(
1082          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1083            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1084              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1085            resp -> null)).call();
1086  }
1087
1088  private byte[] toEncodeRegionName(byte[] regionName) {
1089    try {
1090      return RegionInfo.isEncodedRegionName(regionName) ? regionName
1091          : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1092    } catch (IOException e) {
1093      return regionName;
1094    }
1095  }
1096
1097  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1098      CompletableFuture<TableName> result) {
1099    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1100      if (err != null) {
1101        result.completeExceptionally(err);
1102        return;
1103      }
1104      RegionInfo regionInfo = location.getRegion();
1105      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1106        result.completeExceptionally(
1107          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1108        return;
1109      }
1110      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1111        if (!tableName.get().equals(regionInfo.getTable())) {
1112          // tables of this two region should be same.
1113          result.completeExceptionally(
1114            new IllegalArgumentException("Cannot merge regions from two different tables " +
1115              tableName.get() + " and " + regionInfo.getTable()));
1116        } else {
1117          result.complete(tableName.get());
1118        }
1119      }
1120    });
1121  }
1122
1123  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
1124      byte[] encodeRegionNameB) {
1125    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1126    CompletableFuture<TableName> future = new CompletableFuture<>();
1127
1128    checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
1129    checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
1130    return future;
1131  }
1132
1133  @Override
1134  public CompletableFuture<Boolean> mergeSwitch(boolean on) {
1135    return setSplitOrMergeOn(on, MasterSwitchType.MERGE);
1136  }
1137
1138  @Override
1139  public CompletableFuture<Boolean> isMergeEnabled() {
1140    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1141  }
1142
1143  @Override
1144  public CompletableFuture<Boolean> splitSwitch(boolean on) {
1145    return setSplitOrMergeOn(on, MasterSwitchType.SPLIT);
1146  }
1147
1148  @Override
1149  public CompletableFuture<Boolean> isSplitEnabled() {
1150    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1151  }
1152
1153  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean on, MasterSwitchType switchType) {
1154    SetSplitOrMergeEnabledRequest request =
1155        RequestConverter.buildSetSplitOrMergeEnabledRequest(on, false, switchType);
1156    return this
1157        .<Boolean> newMasterCaller()
1158        .action(
1159          (controller, stub) -> this
1160              .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(
1161                controller, stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req,
1162                  done), (resp) -> resp.getPrevValueList().get(0))).call();
1163  }
1164
1165  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1166    IsSplitOrMergeEnabledRequest request =
1167        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1168    return this
1169        .<Boolean> newMasterCaller()
1170        .action(
1171          (controller, stub) -> this
1172              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1173                controller, stub, request,
1174                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1175                (resp) -> resp.getEnabled())).call();
1176  }
1177
1178  @Override
1179  public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
1180      boolean forcible) {
1181    CompletableFuture<Void> future = new CompletableFuture<>();
1182    final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
1183    final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
1184
1185    addListener(checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB),
1186      (tableName, err) -> {
1187        if (err != null) {
1188          future.completeExceptionally(err);
1189          return;
1190        }
1191
1192        MergeTableRegionsRequest request = null;
1193        try {
1194          request = RequestConverter.buildMergeTableRegionsRequest(
1195            new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
1196            ng.newNonce());
1197        } catch (DeserializationException e) {
1198          future.completeExceptionally(e);
1199          return;
1200        }
1201
1202        addListener(
1203          this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
1204            (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
1205            new MergeTableRegionProcedureBiConsumer(tableName)),
1206          (ret, err2) -> {
1207            if (err2 != null) {
1208              future.completeExceptionally(err2);
1209            } else {
1210              future.complete(ret);
1211            }
1212          });
1213      });
1214    return future;
1215  }
1216
1217  @Override
1218  public CompletableFuture<Void> split(TableName tableName) {
1219    CompletableFuture<Void> future = new CompletableFuture<>();
1220    addListener(tableExists(tableName), (exist, error) -> {
1221      if (error != null) {
1222        future.completeExceptionally(error);
1223        return;
1224      }
1225      if (!exist) {
1226        future.completeExceptionally(new TableNotFoundException(tableName));
1227        return;
1228      }
1229      addListener(
1230        metaTable
1231          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1232            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1233            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1234        (results, err2) -> {
1235          if (err2 != null) {
1236            future.completeExceptionally(err2);
1237            return;
1238          }
1239          if (results != null && !results.isEmpty()) {
1240            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1241            for (Result r : results) {
1242              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1243                continue;
1244              }
1245              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1246              if (rl != null) {
1247                for (HRegionLocation h : rl.getRegionLocations()) {
1248                  if (h != null && h.getServerName() != null) {
1249                    RegionInfo hri = h.getRegion();
1250                    if (hri == null || hri.isSplitParent() ||
1251                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1252                      continue;
1253                    }
1254                    splitFutures.add(split(hri, null));
1255                  }
1256                }
1257              }
1258            }
1259            addListener(
1260              CompletableFuture
1261                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1262              (ret, exception) -> {
1263                if (exception != null) {
1264                  future.completeExceptionally(exception);
1265                  return;
1266                }
1267                future.complete(ret);
1268              });
1269          } else {
1270            future.complete(null);
1271          }
1272        });
1273    });
1274    return future;
1275  }
1276
1277  @Override
1278  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1279    CompletableFuture<Void> result = new CompletableFuture<>();
1280    if (splitPoint == null) {
1281      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1282    }
1283    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint),
1284      (loc, err) -> {
1285        if (err != null) {
1286          result.completeExceptionally(err);
1287        } else if (loc == null || loc.getRegion() == null) {
1288          result.completeExceptionally(new IllegalArgumentException(
1289            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1290        } else {
1291          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1292            if (err2 != null) {
1293              result.completeExceptionally(err2);
1294            } else {
1295              result.complete(ret);
1296            }
1297
1298          });
1299        }
1300      });
1301    return result;
1302  }
1303
1304  @Override
1305  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1306    CompletableFuture<Void> future = new CompletableFuture<>();
1307    addListener(getRegionLocation(regionName), (location, err) -> {
1308      RegionInfo regionInfo = location.getRegion();
1309      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1310        future
1311          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1312            "Replicas are auto-split when their primary is split."));
1313        return;
1314      }
1315      ServerName serverName = location.getServerName();
1316      if (serverName == null) {
1317        future
1318          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1319        return;
1320      }
1321      addListener(split(regionInfo, null), (ret, err2) -> {
1322        if (err2 != null) {
1323          future.completeExceptionally(err2);
1324        } else {
1325          future.complete(ret);
1326        }
1327      });
1328    });
1329    return future;
1330  }
1331
1332  @Override
1333  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1334    Preconditions.checkNotNull(splitPoint,
1335      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1336    CompletableFuture<Void> future = new CompletableFuture<>();
1337    addListener(getRegionLocation(regionName), (location, err) -> {
1338      RegionInfo regionInfo = location.getRegion();
1339      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1340        future
1341          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1342            "Replicas are auto-split when their primary is split."));
1343        return;
1344      }
1345      ServerName serverName = location.getServerName();
1346      if (serverName == null) {
1347        future
1348          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1349        return;
1350      }
1351      if (regionInfo.getStartKey() != null &&
1352        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1353        future.completeExceptionally(
1354          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1355        return;
1356      }
1357      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1358        if (err2 != null) {
1359          future.completeExceptionally(err2);
1360        } else {
1361          future.complete(ret);
1362        }
1363      });
1364    });
1365    return future;
1366  }
1367
1368  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1369    CompletableFuture<Void> future = new CompletableFuture<>();
1370    TableName tableName = hri.getTable();
1371    SplitTableRegionRequest request = null;
1372    try {
1373      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1374        ng.newNonce());
1375    } catch (DeserializationException e) {
1376      future.completeExceptionally(e);
1377      return future;
1378    }
1379
1380    addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request,
1381      (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
1382      new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1383        if (err2 != null) {
1384          future.completeExceptionally(err2);
1385        } else {
1386          future.complete(ret);
1387        }
1388      });
1389    return future;
1390  }
1391
1392  @Override
1393  public CompletableFuture<Void> assign(byte[] regionName) {
1394    CompletableFuture<Void> future = new CompletableFuture<>();
1395    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1396      if (err != null) {
1397        future.completeExceptionally(err);
1398        return;
1399      }
1400      addListener(this.<Void> newMasterCaller()
1401        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1402          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1403          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1404        .call(), (ret, err2) -> {
1405          if (err2 != null) {
1406            future.completeExceptionally(err2);
1407          } else {
1408            future.complete(ret);
1409          }
1410        });
1411    });
1412    return future;
1413  }
1414
1415  @Override
1416  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1417    CompletableFuture<Void> future = new CompletableFuture<>();
1418    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1419      if (err != null) {
1420        future.completeExceptionally(err);
1421        return;
1422      }
1423      addListener(
1424        this.<Void> newMasterCaller()
1425          .action(((controller, stub) -> this
1426            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1427              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1428              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1429          .call(),
1430        (ret, err2) -> {
1431          if (err2 != null) {
1432            future.completeExceptionally(err2);
1433          } else {
1434            future.complete(ret);
1435          }
1436        });
1437    });
1438    return future;
1439  }
1440
1441  @Override
1442  public CompletableFuture<Void> offline(byte[] regionName) {
1443    CompletableFuture<Void> future = new CompletableFuture<>();
1444    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1445      if (err != null) {
1446        future.completeExceptionally(err);
1447        return;
1448      }
1449      addListener(
1450        this.<Void> newMasterCaller()
1451          .action(((controller, stub) -> this
1452            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1453              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1454              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1455          .call(),
1456        (ret, err2) -> {
1457          if (err2 != null) {
1458            future.completeExceptionally(err2);
1459          } else {
1460            future.complete(ret);
1461          }
1462        });
1463    });
1464    return future;
1465  }
1466
1467  @Override
1468  public CompletableFuture<Void> move(byte[] regionName) {
1469    CompletableFuture<Void> future = new CompletableFuture<>();
1470    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1471      if (err != null) {
1472        future.completeExceptionally(err);
1473        return;
1474      }
1475      addListener(
1476        moveRegion(
1477          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1478        (ret, err2) -> {
1479          if (err2 != null) {
1480            future.completeExceptionally(err2);
1481          } else {
1482            future.complete(ret);
1483          }
1484        });
1485    });
1486    return future;
1487  }
1488
1489  @Override
1490  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1491    Preconditions.checkNotNull(destServerName,
1492      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1493    CompletableFuture<Void> future = new CompletableFuture<>();
1494    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1495      if (err != null) {
1496        future.completeExceptionally(err);
1497        return;
1498      }
1499      addListener(moveRegion(RequestConverter
1500        .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1501        (ret, err2) -> {
1502          if (err2 != null) {
1503            future.completeExceptionally(err2);
1504          } else {
1505            future.complete(ret);
1506          }
1507        });
1508    });
1509    return future;
1510  }
1511
1512  private CompletableFuture<Void> moveRegion(MoveRegionRequest request) {
1513    return this
1514        .<Void> newMasterCaller()
1515        .action(
1516          (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1517            stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)).call();
1518  }
1519
1520  @Override
1521  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1522    return this
1523        .<Void> newMasterCaller()
1524        .action(
1525          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1526            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1527            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1528  }
1529
1530  @Override
1531  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1532    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1533    Scan scan = QuotaTableUtil.makeScan(filter);
1534    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1535        .scan(scan, new AdvancedScanResultConsumer() {
1536          List<QuotaSettings> settings = new ArrayList<>();
1537
1538          @Override
1539          public void onNext(Result[] results, ScanController controller) {
1540            for (Result result : results) {
1541              try {
1542                QuotaTableUtil.parseResultToCollection(result, settings);
1543              } catch (IOException e) {
1544                controller.terminate();
1545                future.completeExceptionally(e);
1546              }
1547            }
1548          }
1549
1550          @Override
1551          public void onError(Throwable error) {
1552            future.completeExceptionally(error);
1553          }
1554
1555          @Override
1556          public void onComplete() {
1557            future.complete(settings);
1558          }
1559        });
1560    return future;
1561  }
1562
1563  @Override
1564  public CompletableFuture<Void> addReplicationPeer(String peerId,
1565      ReplicationPeerConfig peerConfig, boolean enabled) {
1566    return this
1567        .<Void> newMasterCaller()
1568        .action(
1569          (controller, stub) -> this
1570              .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
1571                RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s,
1572                    c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
1573  }
1574
1575  @Override
1576  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1577    return this
1578        .<Void> newMasterCaller()
1579        .action(
1580          (controller, stub) -> this
1581              .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
1582                stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1583                (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
1584  }
1585
1586  @Override
1587  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1588    return this
1589        .<Void> newMasterCaller()
1590        .action(
1591          (controller, stub) -> this
1592              .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
1593                stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
1594                (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
1595  }
1596
1597  @Override
1598  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1599    return this
1600        .<Void> newMasterCaller()
1601        .action(
1602          (controller, stub) -> this
1603              .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
1604                controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
1605                    c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
1606        .call();
1607  }
1608
1609  @Override
1610  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1611    return this
1612        .<ReplicationPeerConfig> newMasterCaller()
1613        .action(
1614          (controller, stub) -> this
1615              .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
1616                controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
1617                    s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1618                (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1619  }
1620
1621  @Override
1622  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1623      ReplicationPeerConfig peerConfig) {
1624    return this
1625        .<Void> newMasterCaller()
1626        .action(
1627          (controller, stub) -> this
1628              .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call(
1629                controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
1630                  peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
1631                    resp) -> null)).call();
1632  }
1633
1634  @Override
1635  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1636      Map<TableName, List<String>> tableCfs) {
1637    if (tableCfs == null) {
1638      return failedFuture(new ReplicationException("tableCfs is null"));
1639    }
1640
1641    CompletableFuture<Void> future = new CompletableFuture<Void>();
1642    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1643      if (!completeExceptionally(future, error)) {
1644        ReplicationPeerConfig newPeerConfig =
1645          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1646        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1647          if (!completeExceptionally(future, error)) {
1648            future.complete(result);
1649          }
1650        });
1651      }
1652    });
1653    return future;
1654  }
1655
1656  @Override
1657  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1658      Map<TableName, List<String>> tableCfs) {
1659    if (tableCfs == null) {
1660      return failedFuture(new ReplicationException("tableCfs is null"));
1661    }
1662
1663    CompletableFuture<Void> future = new CompletableFuture<Void>();
1664    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1665      if (!completeExceptionally(future, error)) {
1666        ReplicationPeerConfig newPeerConfig = null;
1667        try {
1668          newPeerConfig = ReplicationPeerConfigUtil
1669            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1670        } catch (ReplicationException e) {
1671          future.completeExceptionally(e);
1672          return;
1673        }
1674        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1675          if (!completeExceptionally(future, error)) {
1676            future.complete(result);
1677          }
1678        });
1679      }
1680    });
1681    return future;
1682  }
1683
1684  @Override
1685  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1686    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1687  }
1688
1689  @Override
1690  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1691    Preconditions.checkNotNull(pattern,
1692      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1693    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1694  }
1695
1696  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1697      ListReplicationPeersRequest request) {
1698    return this
1699        .<List<ReplicationPeerDescription>> newMasterCaller()
1700        .action(
1701          (controller, stub) -> this
1702              .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
1703                controller,
1704                stub,
1705                request,
1706                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1707                (resp) -> resp.getPeerDescList().stream()
1708                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1709                    .collect(Collectors.toList()))).call();
1710  }
1711
1712  @Override
1713  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1714    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1715    addListener(listTableDescriptors(), (tables, error) -> {
1716      if (!completeExceptionally(future, error)) {
1717        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1718        tables.forEach(table -> {
1719          Map<String, Integer> cfs = new HashMap<>();
1720          Stream.of(table.getColumnFamilies())
1721            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1722            .forEach(column -> {
1723              cfs.put(column.getNameAsString(), column.getScope());
1724            });
1725          if (!cfs.isEmpty()) {
1726            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1727          }
1728        });
1729        future.complete(replicatedTableCFs);
1730      }
1731    });
1732    return future;
1733  }
1734
1735  @Override
1736  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1737    SnapshotProtos.SnapshotDescription snapshot =
1738      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1739    try {
1740      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1741    } catch (IllegalArgumentException e) {
1742      return failedFuture(e);
1743    }
1744    CompletableFuture<Void> future = new CompletableFuture<>();
1745    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1746    addListener(this.<Long> newMasterCaller()
1747      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1748        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1749        resp -> resp.getExpectedTimeout()))
1750      .call(), (expectedTimeout, err) -> {
1751        if (err != null) {
1752          future.completeExceptionally(err);
1753          return;
1754        }
1755        TimerTask pollingTask = new TimerTask() {
1756          int tries = 0;
1757          long startTime = EnvironmentEdgeManager.currentTime();
1758          long endTime = startTime + expectedTimeout;
1759          long maxPauseTime = expectedTimeout / maxAttempts;
1760
1761          @Override
1762          public void run(Timeout timeout) throws Exception {
1763            if (EnvironmentEdgeManager.currentTime() < endTime) {
1764              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1765                if (err2 != null) {
1766                  future.completeExceptionally(err2);
1767                } else if (done) {
1768                  future.complete(null);
1769                } else {
1770                  // retry again after pauseTime.
1771                  long pauseTime =
1772                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1773                  pauseTime = Math.min(pauseTime, maxPauseTime);
1774                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1775                    TimeUnit.MILLISECONDS);
1776                }
1777              });
1778            } else {
1779              future.completeExceptionally(
1780                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1781                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1782            }
1783          }
1784        };
1785        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1786      });
1787    return future;
1788  }
1789
1790  @Override
1791  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1792    return this
1793        .<Boolean> newMasterCaller()
1794        .action(
1795          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1796            controller,
1797            stub,
1798            IsSnapshotDoneRequest.newBuilder()
1799                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1800                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1801  }
1802
1803  @Override
1804  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1805    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1806      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1807      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1808    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1809  }
1810
1811  @Override
1812  public CompletableFuture<Void> restoreSnapshot(String snapshotName,
1813      boolean takeFailSafeSnapshot) {
1814    CompletableFuture<Void> future = new CompletableFuture<>();
1815    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1816      if (err != null) {
1817        future.completeExceptionally(err);
1818        return;
1819      }
1820      TableName tableName = null;
1821      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1822        for (SnapshotDescription snap : snapshotDescriptions) {
1823          if (snap.getName().equals(snapshotName)) {
1824            tableName = snap.getTableName();
1825            break;
1826          }
1827        }
1828      }
1829      if (tableName == null) {
1830        future.completeExceptionally(new RestoreSnapshotException(
1831          "Unable to find the table name for snapshot=" + snapshotName));
1832        return;
1833      }
1834      final TableName finalTableName = tableName;
1835      addListener(tableExists(finalTableName), (exists, err2) -> {
1836        if (err2 != null) {
1837          future.completeExceptionally(err2);
1838        } else if (!exists) {
1839          // if table does not exist, then just clone snapshot into new table.
1840          completeConditionalOnFuture(future,
1841            internalRestoreSnapshot(snapshotName, finalTableName));
1842        } else {
1843          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1844            if (err4 != null) {
1845              future.completeExceptionally(err4);
1846            } else if (!disabled) {
1847              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1848            } else {
1849              completeConditionalOnFuture(future,
1850                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
1851            }
1852          });
1853        }
1854      });
1855    });
1856    return future;
1857  }
1858
1859  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1860      boolean takeFailSafeSnapshot) {
1861    if (takeFailSafeSnapshot) {
1862      CompletableFuture<Void> future = new CompletableFuture<>();
1863      // Step.1 Take a snapshot of the current state
1864      String failSafeSnapshotSnapshotNameFormat =
1865        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1866          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1867      final String failSafeSnapshotSnapshotName =
1868        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1869          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1870          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1871      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1872      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1873        if (err != null) {
1874          future.completeExceptionally(err);
1875        } else {
1876          // Step.2 Restore snapshot
1877          addListener(internalRestoreSnapshot(snapshotName, tableName), (void2, err2) -> {
1878            if (err2 != null) {
1879              // Step.3.a Something went wrong during the restore and try to rollback.
1880              addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName),
1881                (void3, err3) -> {
1882                  if (err3 != null) {
1883                    future.completeExceptionally(err3);
1884                  } else {
1885                    String msg =
1886                      "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1887                        failSafeSnapshotSnapshotName + " succeeded.";
1888                    future.completeExceptionally(new RestoreSnapshotException(msg));
1889                  }
1890                });
1891            } else {
1892              // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
1893              LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1894              addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
1895                if (err3 != null) {
1896                  LOG.error(
1897                    "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
1898                    err3);
1899                  future.completeExceptionally(err3);
1900                } else {
1901                  future.complete(ret3);
1902                }
1903              });
1904            }
1905          });
1906        }
1907      });
1908      return future;
1909    } else {
1910      return internalRestoreSnapshot(snapshotName, tableName);
1911    }
1912  }
1913
1914  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
1915      CompletableFuture<T> parentFuture) {
1916    addListener(parentFuture, (res, err) -> {
1917      if (err != null) {
1918        dependentFuture.completeExceptionally(err);
1919      } else {
1920        dependentFuture.complete(res);
1921      }
1922    });
1923  }
1924
1925  @Override
1926  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
1927    CompletableFuture<Void> future = new CompletableFuture<>();
1928    addListener(tableExists(tableName), (exists, err) -> {
1929      if (err != null) {
1930        future.completeExceptionally(err);
1931      } else if (exists) {
1932        future.completeExceptionally(new TableExistsException(tableName));
1933      } else {
1934        completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName));
1935      }
1936    });
1937    return future;
1938  }
1939
1940  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) {
1941    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
1942        .setName(snapshotName).setTable(tableName.getNameAsString()).build();
1943    try {
1944      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1945    } catch (IllegalArgumentException e) {
1946      return failedFuture(e);
1947    }
1948    return waitProcedureResult(this
1949        .<Long> newMasterCaller()
1950        .action(
1951          (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(
1952            controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
1953                .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
1954                done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call());
1955  }
1956
1957  @Override
1958  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
1959    return getCompletedSnapshots(null);
1960  }
1961
1962  @Override
1963  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
1964    Preconditions.checkNotNull(pattern,
1965      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
1966    return getCompletedSnapshots(pattern);
1967  }
1968
1969  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
1970    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
1971        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
1972        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
1973          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
1974          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
1975        .call();
1976  }
1977
1978  @Override
1979  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
1980    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
1981        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
1982    return getCompletedSnapshots(tableNamePattern, null);
1983  }
1984
1985  @Override
1986  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
1987      Pattern snapshotNamePattern) {
1988    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
1989        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
1990    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
1991        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
1992    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
1993  }
1994
1995  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
1996      Pattern tableNamePattern, Pattern snapshotNamePattern) {
1997    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
1998    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
1999      if (err != null) {
2000        future.completeExceptionally(err);
2001        return;
2002      }
2003      if (tableNames == null || tableNames.size() <= 0) {
2004        future.complete(Collections.emptyList());
2005        return;
2006      }
2007      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2008        if (err2 != null) {
2009          future.completeExceptionally(err2);
2010          return;
2011        }
2012        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2013          future.complete(Collections.emptyList());
2014          return;
2015        }
2016        future.complete(snapshotDescList.stream()
2017          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2018          .collect(Collectors.toList()));
2019      });
2020    });
2021    return future;
2022  }
2023
2024  @Override
2025  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2026    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2027  }
2028
2029  @Override
2030  public CompletableFuture<Void> deleteSnapshots() {
2031    return internalDeleteSnapshots(null, null);
2032  }
2033
2034  @Override
2035  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2036    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2037        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2038    return internalDeleteSnapshots(null, snapshotNamePattern);
2039  }
2040
2041  @Override
2042  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2043    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2044        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2045    return internalDeleteSnapshots(tableNamePattern, null);
2046  }
2047
2048  @Override
2049  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2050      Pattern snapshotNamePattern) {
2051    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2052        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2053    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2054        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2055    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2056  }
2057
2058  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2059      Pattern snapshotNamePattern) {
2060    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2061    if (tableNamePattern == null) {
2062      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2063    } else {
2064      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2065    }
2066    CompletableFuture<Void> future = new CompletableFuture<>();
2067    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2068      if (err != null) {
2069        future.completeExceptionally(err);
2070        return;
2071      }
2072      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2073        future.complete(null);
2074        return;
2075      }
2076      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2077        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2078          if (e != null) {
2079            future.completeExceptionally(e);
2080          } else {
2081            future.complete(v);
2082          }
2083        });
2084    }));
2085    return future;
2086  }
2087
2088  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2089    return this
2090        .<Void> newMasterCaller()
2091        .action(
2092          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2093            controller,
2094            stub,
2095            DeleteSnapshotRequest.newBuilder()
2096                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2097                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2098  }
2099
2100  @Override
2101  public CompletableFuture<Void> execProcedure(String signature, String instance,
2102      Map<String, String> props) {
2103    CompletableFuture<Void> future = new CompletableFuture<>();
2104    ProcedureDescription procDesc =
2105      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2106    addListener(this.<Long> newMasterCaller()
2107      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2108        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2109        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2110      .call(), (expectedTimeout, err) -> {
2111        if (err != null) {
2112          future.completeExceptionally(err);
2113          return;
2114        }
2115        TimerTask pollingTask = new TimerTask() {
2116          int tries = 0;
2117          long startTime = EnvironmentEdgeManager.currentTime();
2118          long endTime = startTime + expectedTimeout;
2119          long maxPauseTime = expectedTimeout / maxAttempts;
2120
2121          @Override
2122          public void run(Timeout timeout) throws Exception {
2123            if (EnvironmentEdgeManager.currentTime() < endTime) {
2124              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2125                if (err2 != null) {
2126                  future.completeExceptionally(err2);
2127                  return;
2128                }
2129                if (done) {
2130                  future.complete(null);
2131                } else {
2132                  // retry again after pauseTime.
2133                  long pauseTime =
2134                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2135                  pauseTime = Math.min(pauseTime, maxPauseTime);
2136                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2137                    TimeUnit.MICROSECONDS);
2138                }
2139              });
2140            } else {
2141              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2142                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2143            }
2144          }
2145        };
2146        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2147        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2148      });
2149    return future;
2150  }
2151
2152  @Override
2153  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2154      Map<String, String> props) {
2155    ProcedureDescription proDesc =
2156        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2157    return this.<byte[]> newMasterCaller()
2158        .action(
2159          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2160            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2161            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2162            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2163        .call();
2164  }
2165
2166  @Override
2167  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2168      Map<String, String> props) {
2169    ProcedureDescription proDesc =
2170        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2171    return this.<Boolean> newMasterCaller()
2172        .action((controller, stub) -> this
2173            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2174              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2175              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2176        .call();
2177  }
2178
2179  @Override
2180  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2181    return this.<Boolean> newMasterCaller().action(
2182      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2183        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2184        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2185        .call();
2186  }
2187
2188  @Override
2189  public CompletableFuture<String> getProcedures() {
2190    return this
2191        .<String> newMasterCaller()
2192        .action(
2193          (controller, stub) -> this
2194              .<GetProceduresRequest, GetProceduresResponse, String> call(
2195                controller, stub, GetProceduresRequest.newBuilder().build(),
2196                (s, c, req, done) -> s.getProcedures(c, req, done),
2197                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2198  }
2199
2200  @Override
2201  public CompletableFuture<String> getLocks() {
2202    return this
2203        .<String> newMasterCaller()
2204        .action(
2205          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2206            controller, stub, GetLocksRequest.newBuilder().build(),
2207            (s, c, req, done) -> s.getLocks(c, req, done),
2208            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2209  }
2210
2211  @Override
2212  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
2213    return this.<Void> newMasterCaller()
2214        .action((controller, stub) -> this
2215          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2216            controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2217            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2218        .call();
2219  }
2220
2221  @Override
2222  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2223    return this.<List<ServerName>> newMasterCaller()
2224        .action((controller, stub) -> this
2225          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2226            List<ServerName>> call(
2227              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2228              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2229              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2230                  .collect(Collectors.toList())))
2231        .call();
2232  }
2233
2234  @Override
2235  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2236      List<byte[]> encodedRegionNames) {
2237    return this.<Void> newMasterCaller()
2238        .action((controller, stub) -> this
2239          .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
2240            stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2241            (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2242        .call();
2243  }
2244
2245  /**
2246   * Get the region location for the passed region name. The region name may be a full region name
2247   * or encoded region name. If the region does not found, then it'll throw an
2248   * UnknownRegionException wrapped by a {@link CompletableFuture}
2249   * @param regionNameOrEncodedRegionName
2250   * @return region location, wrapped by a {@link CompletableFuture}
2251   */
2252  @VisibleForTesting
2253  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2254    if (regionNameOrEncodedRegionName == null) {
2255      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2256    }
2257    try {
2258      CompletableFuture<Optional<HRegionLocation>> future;
2259      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2260        future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2261          regionNameOrEncodedRegionName);
2262      } else {
2263        future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2264      }
2265
2266      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2267      addListener(future, (location, err) -> {
2268        if (err != null) {
2269          returnedFuture.completeExceptionally(err);
2270          return;
2271        }
2272        if (!location.isPresent() || location.get().getRegion() == null) {
2273          returnedFuture.completeExceptionally(
2274            new UnknownRegionException("Invalid region name or encoded region name: " +
2275              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2276        } else {
2277          returnedFuture.complete(location.get());
2278        }
2279      });
2280      return returnedFuture;
2281    } catch (IOException e) {
2282      return failedFuture(e);
2283    }
2284  }
2285
2286  /**
2287   * Get the region info for the passed region name. The region name may be a full region name or
2288   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2289   * wrapped by a {@link CompletableFuture}
2290   * @param regionNameOrEncodedRegionName
2291   * @return region info, wrapped by a {@link CompletableFuture}
2292   */
2293  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2294    if (regionNameOrEncodedRegionName == null) {
2295      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2296    }
2297
2298    if (Bytes.equals(regionNameOrEncodedRegionName,
2299      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2300      Bytes.equals(regionNameOrEncodedRegionName,
2301        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2302      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2303    }
2304
2305    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2306    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2307      if (err != null) {
2308        future.completeExceptionally(err);
2309      } else {
2310        future.complete(location.getRegion());
2311      }
2312    });
2313    return future;
2314  }
2315
2316  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2317    if (numRegions < 3) {
2318      throw new IllegalArgumentException("Must create at least three regions");
2319    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2320      throw new IllegalArgumentException("Start key must be smaller than end key");
2321    }
2322    if (numRegions == 3) {
2323      return new byte[][] { startKey, endKey };
2324    }
2325    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2326    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2327      throw new IllegalArgumentException("Unable to split key range into enough regions");
2328    }
2329    return splitKeys;
2330  }
2331
2332  private void verifySplitKeys(byte[][] splitKeys) {
2333    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2334    // Verify there are no duplicate split keys
2335    byte[] lastKey = null;
2336    for (byte[] splitKey : splitKeys) {
2337      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2338        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2339      }
2340      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2341        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2342            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2343      }
2344      lastKey = splitKey;
2345    }
2346  }
2347
2348  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2349
2350    abstract void onFinished();
2351
2352    abstract void onError(Throwable error);
2353
2354    @Override
2355    public void accept(Void v, Throwable error) {
2356      if (error != null) {
2357        onError(error);
2358        return;
2359      }
2360      onFinished();
2361    }
2362  }
2363
2364  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2365    protected final TableName tableName;
2366
2367    TableProcedureBiConsumer(TableName tableName) {
2368      this.tableName = tableName;
2369    }
2370
2371    abstract String getOperationType();
2372
2373    String getDescription() {
2374      return "Operation: " + getOperationType() + ", " + "Table Name: "
2375          + tableName.getNameWithNamespaceInclAsString();
2376    }
2377
2378    @Override
2379    void onFinished() {
2380      LOG.info(getDescription() + " completed");
2381    }
2382
2383    @Override
2384    void onError(Throwable error) {
2385      LOG.info(getDescription() + " failed with " + error.getMessage());
2386    }
2387  }
2388
2389  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2390    protected final String namespaceName;
2391
2392    NamespaceProcedureBiConsumer(String namespaceName) {
2393      this.namespaceName = namespaceName;
2394    }
2395
2396    abstract String getOperationType();
2397
2398    String getDescription() {
2399      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2400    }
2401
2402    @Override
2403    void onFinished() {
2404      LOG.info(getDescription() + " completed");
2405    }
2406
2407    @Override
2408    void onError(Throwable error) {
2409      LOG.info(getDescription() + " failed with " + error.getMessage());
2410    }
2411  }
2412
2413  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2414
2415    CreateTableProcedureBiConsumer(TableName tableName) {
2416      super(tableName);
2417    }
2418
2419    @Override
2420    String getOperationType() {
2421      return "CREATE";
2422    }
2423  }
2424
2425  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2426
2427    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2428      super(tableName);
2429    }
2430
2431    @Override
2432    String getOperationType() {
2433      return "ENABLE";
2434    }
2435  }
2436
2437  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2438
2439    DeleteTableProcedureBiConsumer(TableName tableName) {
2440      super(tableName);
2441    }
2442
2443    @Override
2444    String getOperationType() {
2445      return "DELETE";
2446    }
2447
2448    @Override
2449    void onFinished() {
2450      connection.getLocator().clearCache(this.tableName);
2451      super.onFinished();
2452    }
2453  }
2454
2455  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2456
2457    TruncateTableProcedureBiConsumer(TableName tableName) {
2458      super(tableName);
2459    }
2460
2461    @Override
2462    String getOperationType() {
2463      return "TRUNCATE";
2464    }
2465  }
2466
2467  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2468
2469    EnableTableProcedureBiConsumer(TableName tableName) {
2470      super(tableName);
2471    }
2472
2473    @Override
2474    String getOperationType() {
2475      return "ENABLE";
2476    }
2477  }
2478
2479  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2480
2481    DisableTableProcedureBiConsumer(TableName tableName) {
2482      super(tableName);
2483    }
2484
2485    @Override
2486    String getOperationType() {
2487      return "DISABLE";
2488    }
2489  }
2490
2491  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2492
2493    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2494      super(tableName);
2495    }
2496
2497    @Override
2498    String getOperationType() {
2499      return "ADD_COLUMN_FAMILY";
2500    }
2501  }
2502
2503  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2504
2505    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2506      super(tableName);
2507    }
2508
2509    @Override
2510    String getOperationType() {
2511      return "DELETE_COLUMN_FAMILY";
2512    }
2513  }
2514
2515  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2516
2517    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2518      super(tableName);
2519    }
2520
2521    @Override
2522    String getOperationType() {
2523      return "MODIFY_COLUMN_FAMILY";
2524    }
2525  }
2526
2527  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2528
2529    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2530      super(namespaceName);
2531    }
2532
2533    @Override
2534    String getOperationType() {
2535      return "CREATE_NAMESPACE";
2536    }
2537  }
2538
2539  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2540
2541    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2542      super(namespaceName);
2543    }
2544
2545    @Override
2546    String getOperationType() {
2547      return "DELETE_NAMESPACE";
2548    }
2549  }
2550
2551  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2552
2553    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2554      super(namespaceName);
2555    }
2556
2557    @Override
2558    String getOperationType() {
2559      return "MODIFY_NAMESPACE";
2560    }
2561  }
2562
2563  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2564
2565    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2566      super(tableName);
2567    }
2568
2569    @Override
2570    String getOperationType() {
2571      return "MERGE_REGIONS";
2572    }
2573  }
2574
2575  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2576
2577    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2578      super(tableName);
2579    }
2580
2581    @Override
2582    String getOperationType() {
2583      return "SPLIT_REGION";
2584    }
2585  }
2586
2587  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2588    CompletableFuture<Void> future = new CompletableFuture<>();
2589    addListener(procFuture, (procId, error) -> {
2590      if (error != null) {
2591        future.completeExceptionally(error);
2592        return;
2593      }
2594      getProcedureResult(procId, future, 0);
2595    });
2596    return future;
2597  }
2598
2599  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2600    addListener(
2601      this.<GetProcedureResultResponse> newMasterCaller()
2602        .action((controller, stub) -> this
2603          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2604            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2605            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2606        .call(),
2607      (response, error) -> {
2608        if (error != null) {
2609          LOG.warn("failed to get the procedure result procId={}", procId,
2610            ConnectionUtils.translateException(error));
2611          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2612            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2613          return;
2614        }
2615        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2616          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2617            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2618          return;
2619        }
2620        if (response.hasException()) {
2621          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2622          future.completeExceptionally(ioe);
2623        } else {
2624          future.complete(null);
2625        }
2626      });
2627  }
2628
2629  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2630    CompletableFuture<T> future = new CompletableFuture<>();
2631    future.completeExceptionally(error);
2632    return future;
2633  }
2634
2635  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2636    if (error != null) {
2637      future.completeExceptionally(error);
2638      return true;
2639    }
2640    return false;
2641  }
2642
2643  @Override
2644  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2645    return getClusterMetrics(EnumSet.allOf(Option.class));
2646  }
2647
2648  @Override
2649  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2650    return this
2651        .<ClusterMetrics> newMasterCaller()
2652        .action(
2653          (controller, stub) -> this
2654              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2655                stub, RequestConverter.buildGetClusterStatusRequest(options),
2656                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2657                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2658  }
2659
2660  @Override
2661  public CompletableFuture<Void> shutdown() {
2662    return this
2663        .<Void> newMasterCaller()
2664        .action(
2665          (controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2666            stub, ShutdownRequest.newBuilder().build(),
2667            (s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call();
2668  }
2669
2670  @Override
2671  public CompletableFuture<Void> stopMaster() {
2672    return this
2673        .<Void> newMasterCaller()
2674        .action(
2675          (controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(controller,
2676            stub, StopMasterRequest.newBuilder().build(),
2677            (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call();
2678  }
2679
2680  @Override
2681  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2682    StopServerRequest request =
2683        RequestConverter.buildStopServerRequest("Called by admin client "
2684            + this.connection.toString());
2685    return this
2686        .<Void> newAdminCaller()
2687        .action(
2688          (controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2689            controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2690            resp -> null)).serverName(serverName).call();
2691  }
2692
2693  @Override
2694  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2695    return this
2696        .<Void> newAdminCaller()
2697        .action(
2698          (controller, stub) -> this
2699              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2700                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2701                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2702        .serverName(serverName).call();
2703  }
2704
2705  @Override
2706  public CompletableFuture<Void> updateConfiguration() {
2707    CompletableFuture<Void> future = new CompletableFuture<Void>();
2708    addListener(
2709      getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)),
2710      (status, err) -> {
2711        if (err != null) {
2712          future.completeExceptionally(err);
2713        } else {
2714          List<CompletableFuture<Void>> futures = new ArrayList<>();
2715          status.getLiveServerMetrics().keySet()
2716            .forEach(server -> futures.add(updateConfiguration(server)));
2717          futures.add(updateConfiguration(status.getMasterName()));
2718          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2719          addListener(
2720            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2721            (result, err2) -> {
2722              if (err2 != null) {
2723                future.completeExceptionally(err2);
2724              } else {
2725                future.complete(result);
2726              }
2727            });
2728        }
2729      });
2730    return future;
2731  }
2732
2733  @Override
2734  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2735    return this
2736        .<Void> newAdminCaller()
2737        .action(
2738          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2739            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2740            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2741        .serverName(serverName).call();
2742  }
2743
2744  @Override
2745  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2746    return this
2747        .<Void> newAdminCaller()
2748        .action(
2749          (controller, stub) -> this
2750              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2751                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2752                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2753        .serverName(serverName).call();
2754  }
2755
2756  @Override
2757  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2758    return this
2759        .<List<SecurityCapability>> newMasterCaller()
2760        .action(
2761          (controller, stub) -> this
2762              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
2763                controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
2764                    done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
2765                    .toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
2766  }
2767
2768  @Override
2769  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2770    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2771  }
2772
2773  @Override
2774  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2775      TableName tableName) {
2776    Preconditions.checkNotNull(tableName,
2777      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2778    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2779  }
2780
2781  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2782      ServerName serverName) {
2783    return this.<List<RegionMetrics>> newAdminCaller()
2784        .action((controller, stub) -> this
2785            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2786              adminCall(controller, stub, request, (s, c, req, done) ->
2787                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2788        .serverName(serverName).call();
2789  }
2790
2791  @Override
2792  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2793    return this
2794        .<Boolean> newMasterCaller()
2795        .action(
2796          (controller, stub) -> this
2797              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2798                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2799                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2800                resp -> resp.getInMaintenanceMode())).call();
2801  }
2802
2803  @Override
2804  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2805      CompactType compactType) {
2806    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2807
2808    switch (compactType) {
2809      case MOB:
2810        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
2811          if (err != null) {
2812            future.completeExceptionally(err);
2813            return;
2814          }
2815          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2816
2817          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2818            .action((controller, stub) -> this
2819              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2820                controller, stub,
2821                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2822                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2823            .call(), (resp2, err2) -> {
2824              if (err2 != null) {
2825                future.completeExceptionally(err2);
2826              } else {
2827                if (resp2.hasCompactionState()) {
2828                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2829                } else {
2830                  future.complete(CompactionState.NONE);
2831                }
2832              }
2833            });
2834        });
2835        break;
2836      case NORMAL:
2837        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2838          if (err != null) {
2839            future.completeExceptionally(err);
2840            return;
2841          }
2842          List<CompactionState> regionStates = new ArrayList<>();
2843          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2844          locations.stream().filter(loc -> loc.getServerName() != null)
2845            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2846            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2847              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2848                // If any region compaction state is MAJOR_AND_MINOR
2849                // the table compaction state is MAJOR_AND_MINOR, too.
2850                if (err2 != null) {
2851                  future.completeExceptionally(unwrapCompletionException(err2));
2852                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
2853                  future.complete(regionState);
2854                } else {
2855                  regionStates.add(regionState);
2856                }
2857              }));
2858            });
2859          addListener(
2860            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2861            (ret, err3) -> {
2862              // If future not completed, check all regions's compaction state
2863              if (!future.isCompletedExceptionally() && !future.isDone()) {
2864                CompactionState state = CompactionState.NONE;
2865                for (CompactionState regionState : regionStates) {
2866                  switch (regionState) {
2867                    case MAJOR:
2868                      if (state == CompactionState.MINOR) {
2869                        future.complete(CompactionState.MAJOR_AND_MINOR);
2870                      } else {
2871                        state = CompactionState.MAJOR;
2872                      }
2873                      break;
2874                    case MINOR:
2875                      if (state == CompactionState.MAJOR) {
2876                        future.complete(CompactionState.MAJOR_AND_MINOR);
2877                      } else {
2878                        state = CompactionState.MINOR;
2879                      }
2880                      break;
2881                    case NONE:
2882                    default:
2883                  }
2884                  if (!future.isDone()) {
2885                    future.complete(state);
2886                  }
2887                }
2888              }
2889            });
2890        });
2891        break;
2892      default:
2893        throw new IllegalArgumentException("Unknown compactType: " + compactType);
2894    }
2895
2896    return future;
2897  }
2898
2899  @Override
2900  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
2901    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2902    addListener(getRegionLocation(regionName), (location, err) -> {
2903      if (err != null) {
2904        future.completeExceptionally(err);
2905        return;
2906      }
2907      ServerName serverName = location.getServerName();
2908      if (serverName == null) {
2909        future
2910          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
2911        return;
2912      }
2913      addListener(
2914        this.<GetRegionInfoResponse> newAdminCaller()
2915          .action((controller, stub) -> this
2916            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2917              controller, stub,
2918              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
2919                true),
2920              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2921          .serverName(serverName).call(),
2922        (resp2, err2) -> {
2923          if (err2 != null) {
2924            future.completeExceptionally(err2);
2925          } else {
2926            if (resp2.hasCompactionState()) {
2927              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2928            } else {
2929              future.complete(CompactionState.NONE);
2930            }
2931          }
2932        });
2933    });
2934    return future;
2935  }
2936
2937  @Override
2938  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
2939    MajorCompactionTimestampRequest request =
2940        MajorCompactionTimestampRequest.newBuilder()
2941            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
2942    return this
2943        .<Optional<Long>> newMasterCaller()
2944        .action(
2945          (controller, stub) -> this
2946              .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
2947                controller, stub, request,
2948                (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
2949                ProtobufUtil::toOptionalTimestamp)).call();
2950  }
2951
2952  @Override
2953  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
2954      byte[] regionName) {
2955    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
2956    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
2957    addListener(getRegionInfo(regionName), (region, err) -> {
2958      if (err != null) {
2959        future.completeExceptionally(err);
2960        return;
2961      }
2962      MajorCompactionTimestampForRegionRequest.Builder builder =
2963        MajorCompactionTimestampForRegionRequest.newBuilder();
2964      builder.setRegion(
2965        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
2966      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
2967        .<MajorCompactionTimestampForRegionRequest,
2968        MajorCompactionTimestampResponse, Optional<Long>> call(
2969          controller, stub, builder.build(),
2970          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
2971          ProtobufUtil::toOptionalTimestamp))
2972        .call(), (timestamp, err2) -> {
2973          if (err2 != null) {
2974            future.completeExceptionally(err2);
2975          } else {
2976            future.complete(timestamp);
2977          }
2978        });
2979    });
2980    return future;
2981  }
2982
2983  @Override
2984  public CompletableFuture<Boolean> balancerSwitch(final boolean on) {
2985    return this
2986        .<Boolean> newMasterCaller()
2987        .action(
2988          (controller, stub) -> this
2989              .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
2990                stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
2991                (s, c, req, done) -> s.setBalancerRunning(c, req, done),
2992                (resp) -> resp.getPrevBalanceValue())).call();
2993  }
2994
2995  @Override
2996  public CompletableFuture<Boolean> balance(boolean forcible) {
2997    return this
2998        .<Boolean> newMasterCaller()
2999        .action(
3000          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3001            stub, RequestConverter.buildBalanceRequest(forcible),
3002            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3003  }
3004
3005  @Override
3006  public CompletableFuture<Boolean> isBalancerEnabled() {
3007    return this
3008        .<Boolean> newMasterCaller()
3009        .action(
3010          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
3011            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3012            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3013        .call();
3014  }
3015
3016  @Override
3017  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3018    return this
3019        .<Boolean> newMasterCaller()
3020        .action(
3021          (controller, stub) -> this
3022              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3023                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3024                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3025                    .getPrevNormalizerValue())).call();
3026  }
3027
3028  @Override
3029  public CompletableFuture<Boolean> isNormalizerEnabled() {
3030    return this
3031        .<Boolean> newMasterCaller()
3032        .action(
3033          (controller, stub) -> this
3034              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3035                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3036                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3037                (resp) -> resp.getEnabled())).call();
3038  }
3039
3040  @Override
3041  public CompletableFuture<Boolean> normalize() {
3042    return this
3043        .<Boolean> newMasterCaller()
3044        .action(
3045          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3046            controller, stub, RequestConverter.buildNormalizeRequest(),
3047            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3048        .call();
3049  }
3050
3051  @Override
3052  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3053    return this
3054        .<Boolean> newMasterCaller()
3055        .action(
3056          (controller, stub) -> this
3057              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3058                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3059                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3060                    .getPrevValue())).call();
3061  }
3062
3063  @Override
3064  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3065    return this
3066        .<Boolean> newMasterCaller()
3067        .action(
3068          (controller, stub) -> this
3069              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3070                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3071                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3072        .call();
3073  }
3074
3075  @Override
3076  public CompletableFuture<Boolean> runCleanerChore() {
3077    return this
3078        .<Boolean> newMasterCaller()
3079        .action(
3080          (controller, stub) -> this
3081              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3082                RequestConverter.buildRunCleanerChoreRequest(),
3083                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3084                (resp) -> resp.getCleanerChoreRan())).call();
3085  }
3086
3087  @Override
3088  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3089    return this
3090        .<Boolean> newMasterCaller()
3091        .action(
3092          (controller, stub) -> this
3093              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3094                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3095                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3096                    .getPrevValue())).call();
3097  }
3098
3099  @Override
3100  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3101    return this
3102        .<Boolean> newMasterCaller()
3103        .action(
3104          (controller, stub) -> this
3105              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3106                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3107                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3108                    .getValue())).call();
3109  }
3110
3111  @Override
3112  public CompletableFuture<Integer> runCatalogJanitor() {
3113    return this
3114        .<Integer> newMasterCaller()
3115        .action(
3116          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3117            controller, stub, RequestConverter.buildCatalogScanRequest(),
3118            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3119        .call();
3120  }
3121
3122  @Override
3123  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3124      ServiceCaller<S, R> callable) {
3125    MasterCoprocessorRpcChannelImpl channel =
3126        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3127    S stub = stubMaker.apply(channel);
3128    CompletableFuture<R> future = new CompletableFuture<>();
3129    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3130    callable.call(stub, controller, resp -> {
3131      if (controller.failed()) {
3132        future.completeExceptionally(controller.getFailed());
3133      } else {
3134        future.complete(resp);
3135      }
3136    });
3137    return future;
3138  }
3139
3140  @Override
3141  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3142      ServiceCaller<S, R> callable, ServerName serverName) {
3143    RegionServerCoprocessorRpcChannelImpl channel =
3144        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3145          serverName));
3146    S stub = stubMaker.apply(channel);
3147    CompletableFuture<R> future = new CompletableFuture<>();
3148    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3149    callable.call(stub, controller, resp -> {
3150      if (controller.failed()) {
3151        future.completeExceptionally(controller.getFailed());
3152      } else {
3153        future.complete(resp);
3154      }
3155    });
3156    return future;
3157  }
3158
3159  @Override
3160  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3161    return this.<List<ServerName>> newMasterCaller()
3162      .action((controller, stub) -> this
3163        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3164          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3165          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3166          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3167      .call();
3168  }
3169
3170  private <T> ServerRequestCallerBuilder<T> newServerCaller() {
3171    return this.connection.callerFactory.<T> serverRequest()
3172        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3173        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3174        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
3175        .startLogErrorsCnt(startLogErrorsCnt);
3176  }
3177
3178  @Override
3179  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3180    if (tableName == null) {
3181      return failedFuture(new IllegalArgumentException("Table name is null"));
3182    }
3183    CompletableFuture<Void> future = new CompletableFuture<>();
3184    addListener(tableExists(tableName), (exist, err) -> {
3185      if (err != null) {
3186        future.completeExceptionally(err);
3187        return;
3188      }
3189      if (!exist) {
3190        future.completeExceptionally(new TableNotFoundException(
3191          "Table '" + tableName.getNameAsString() + "' does not exists."));
3192        return;
3193      }
3194      addListener(getTableSplits(tableName), (splits, err1) -> {
3195        if (err1 != null) {
3196          future.completeExceptionally(err1);
3197        } else {
3198          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3199            if (err2 != null) {
3200              future.completeExceptionally(err2);
3201            } else {
3202              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3203                if (err3 != null) {
3204                  future.completeExceptionally(err3);
3205                } else {
3206                  future.complete(result3);
3207                }
3208              });
3209            }
3210          });
3211        }
3212      });
3213    });
3214    return future;
3215  }
3216
3217  @Override
3218  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3219    if (tableName == null) {
3220      return failedFuture(new IllegalArgumentException("Table name is null"));
3221    }
3222    CompletableFuture<Void> future = new CompletableFuture<>();
3223    addListener(tableExists(tableName), (exist, err) -> {
3224      if (err != null) {
3225        future.completeExceptionally(err);
3226        return;
3227      }
3228      if (!exist) {
3229        future.completeExceptionally(new TableNotFoundException(
3230          "Table '" + tableName.getNameAsString() + "' does not exists."));
3231        return;
3232      }
3233      addListener(setTableReplication(tableName, false), (result, err2) -> {
3234        if (err2 != null) {
3235          future.completeExceptionally(err2);
3236        } else {
3237          future.complete(result);
3238        }
3239      });
3240    });
3241    return future;
3242  }
3243
3244  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3245    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3246    addListener(getRegions(tableName), (regions, err2) -> {
3247      if (err2 != null) {
3248        future.completeExceptionally(err2);
3249        return;
3250      }
3251      if (regions.size() == 1) {
3252        future.complete(null);
3253      } else {
3254        byte[][] splits = new byte[regions.size() - 1][];
3255        for (int i = 1; i < regions.size(); i++) {
3256          splits[i - 1] = regions.get(i).getStartKey();
3257        }
3258        future.complete(splits);
3259      }
3260    });
3261    return future;
3262  }
3263
3264  /**
3265   * Connect to peer and check the table descriptor on peer:
3266   * <ol>
3267   * <li>Create the same table on peer when not exist.</li>
3268   * <li>Throw an exception if the table already has replication enabled on any of the column
3269   * families.</li>
3270   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3271   * </ol>
3272   * @param tableName name of the table to sync to the peer
3273   * @param splits table split keys
3274   */
3275  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3276      byte[][] splits) {
3277    CompletableFuture<Void> future = new CompletableFuture<>();
3278    addListener(listReplicationPeers(), (peers, err) -> {
3279      if (err != null) {
3280        future.completeExceptionally(err);
3281        return;
3282      }
3283      if (peers == null || peers.size() <= 0) {
3284        future.completeExceptionally(
3285          new IllegalArgumentException("Found no peer cluster for replication."));
3286        return;
3287      }
3288      List<CompletableFuture<Void>> futures = new ArrayList<>();
3289      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3290        .forEach(peer -> {
3291          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3292        });
3293      addListener(
3294        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3295        (result, err2) -> {
3296          if (err2 != null) {
3297            future.completeExceptionally(err2);
3298          } else {
3299            future.complete(result);
3300          }
3301        });
3302    });
3303    return future;
3304  }
3305
3306  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3307      ReplicationPeerDescription peer) {
3308    Configuration peerConf = null;
3309    try {
3310      peerConf =
3311        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3312    } catch (IOException e) {
3313      return failedFuture(e);
3314    }
3315    CompletableFuture<Void> future = new CompletableFuture<>();
3316    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3317      if (err != null) {
3318        future.completeExceptionally(err);
3319        return;
3320      }
3321      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3322        if (err1 != null) {
3323          future.completeExceptionally(err1);
3324          return;
3325        }
3326        AsyncAdmin peerAdmin = conn.getAdmin();
3327        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3328          if (err2 != null) {
3329            future.completeExceptionally(err2);
3330            return;
3331          }
3332          if (!exist) {
3333            CompletableFuture<Void> createTableFuture = null;
3334            if (splits == null) {
3335              createTableFuture = peerAdmin.createTable(tableDesc);
3336            } else {
3337              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3338            }
3339            addListener(createTableFuture, (result, err3) -> {
3340              if (err3 != null) {
3341                future.completeExceptionally(err3);
3342              } else {
3343                future.complete(result);
3344              }
3345            });
3346          } else {
3347            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3348              (result, err4) -> {
3349                if (err4 != null) {
3350                  future.completeExceptionally(err4);
3351                } else {
3352                  future.complete(result);
3353                }
3354              });
3355          }
3356        });
3357      });
3358    });
3359    return future;
3360  }
3361
3362  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3363      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3364    CompletableFuture<Void> future = new CompletableFuture<>();
3365    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3366      if (err != null) {
3367        future.completeExceptionally(err);
3368        return;
3369      }
3370      if (peerTableDesc == null) {
3371        future.completeExceptionally(
3372          new IllegalArgumentException("Failed to get table descriptor for table " +
3373            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3374        return;
3375      }
3376      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3377        future.completeExceptionally(new IllegalArgumentException(
3378          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3379            ", but the table descriptors are not same when compared with source cluster." +
3380            " Thus can not enable the table's replication switch."));
3381        return;
3382      }
3383      future.complete(null);
3384    });
3385    return future;
3386  }
3387
3388  /**
3389   * Set the table's replication switch if the table's replication switch is already not set.
3390   * @param tableName name of the table
3391   * @param enableRep is replication switch enable or disable
3392   */
3393  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3394    CompletableFuture<Void> future = new CompletableFuture<>();
3395    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3396      if (err != null) {
3397        future.completeExceptionally(err);
3398        return;
3399      }
3400      if (!tableDesc.matchReplicationScope(enableRep)) {
3401        int scope =
3402          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3403        TableDescriptor newTableDesc =
3404          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3405        addListener(modifyTable(newTableDesc), (result, err2) -> {
3406          if (err2 != null) {
3407            future.completeExceptionally(err2);
3408          } else {
3409            future.complete(result);
3410          }
3411        });
3412      } else {
3413        future.complete(null);
3414      }
3415    });
3416    return future;
3417  }
3418
3419  @Override
3420  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3421    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3422    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3423      if (err != null) {
3424        future.completeExceptionally(err);
3425        return;
3426      }
3427      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3428        locations.stream().filter(l -> l.getRegion() != null)
3429          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3430          .collect(Collectors.groupingBy(l -> l.getServerName(),
3431            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3432      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3433      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3434      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3435        futures
3436          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3437            if (err2 != null) {
3438              future.completeExceptionally(unwrapCompletionException(err2));
3439            } else {
3440              aggregator.append(stats);
3441            }
3442          }));
3443      }
3444      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3445        (ret, err3) -> {
3446          if (err3 != null) {
3447            future.completeExceptionally(unwrapCompletionException(err3));
3448          } else {
3449            future.complete(aggregator.sum());
3450          }
3451        });
3452    });
3453    return future;
3454  }
3455
3456  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3457      List<RegionInfo> hris) {
3458    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3459      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3460        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3461        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3462        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3463      .serverName(serverName).call();
3464  }
3465}