001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import com.google.protobuf.Message;
026import com.google.protobuf.RpcChannel;
027import edu.umd.cs.findbugs.annotations.Nullable;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.EnumSet;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Optional;
037import java.util.Set;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentLinkedQueue;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicReference;
043import java.util.function.BiConsumer;
044import java.util.function.Consumer;
045import java.util.function.Function;
046import java.util.function.Supplier;
047import java.util.regex.Pattern;
048import java.util.stream.Collectors;
049import java.util.stream.Stream;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
052import org.apache.hadoop.hbase.CacheEvictionStats;
053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.MetaTableAccessor;
060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
061import org.apache.hadoop.hbase.NamespaceDescriptor;
062import org.apache.hadoop.hbase.RegionLocations;
063import org.apache.hadoop.hbase.RegionMetrics;
064import org.apache.hadoop.hbase.RegionMetricsBuilder;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableExistsException;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.TableNotDisabledException;
069import org.apache.hadoop.hbase.TableNotEnabledException;
070import org.apache.hadoop.hbase.TableNotFoundException;
071import org.apache.hadoop.hbase.UnknownRegionException;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
075import org.apache.hadoop.hbase.client.Scan.ReadType;
076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
077import org.apache.hadoop.hbase.client.replication.TableCFs;
078import org.apache.hadoop.hbase.client.security.SecurityCapability;
079import org.apache.hadoop.hbase.exceptions.DeserializationException;
080import org.apache.hadoop.hbase.ipc.HBaseRpcController;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.replication.ReplicationException;
086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
098import org.apache.yetus.audience.InterfaceAudience;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
103import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
104import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
105import org.apache.hbase.thirdparty.io.netty.util.Timeout;
106import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
107import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
108
109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
110import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
305
306/**
307 * The implementation of AsyncAdmin.
308 * <p>
309 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
310 * be finished inside the rpc framework thread, which means that the callbacks registered to the
311 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
312 * this class should not try to do time consuming tasks in the callbacks.
313 * @since 2.0.0
314 * @see AsyncHBaseAdmin
315 * @see AsyncConnection#getAdmin()
316 * @see AsyncConnection#getAdminBuilder()
317 */
318@InterfaceAudience.Private
319class RawAsyncHBaseAdmin implements AsyncAdmin {
320  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
321
322  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
323
324  private final AsyncConnectionImpl connection;
325
326  private final HashedWheelTimer retryTimer;
327
328  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
329
330  private final long rpcTimeoutNs;
331
332  private final long operationTimeoutNs;
333
334  private final long pauseNs;
335
336  private final long pauseNsForServerOverloaded;
337
338  private final int maxAttempts;
339
340  private final int startLogErrorsCnt;
341
342  private final NonceGenerator ng;
343
344  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
345    AsyncAdminBuilderBase builder) {
346    this.connection = connection;
347    this.retryTimer = retryTimer;
348    this.metaTable = connection.getTable(META_TABLE_NAME);
349    this.rpcTimeoutNs = builder.rpcTimeoutNs;
350    this.operationTimeoutNs = builder.operationTimeoutNs;
351    this.pauseNs = builder.pauseNs;
352    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
353      LOG.warn(
354        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
355          + " the normal pause value {} ms, use the greater one instead",
356        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
357        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
358      this.pauseNsForServerOverloaded = builder.pauseNs;
359    } else {
360      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
361    }
362    this.maxAttempts = builder.maxAttempts;
363    this.startLogErrorsCnt = builder.startLogErrorsCnt;
364    this.ng = connection.getNonceGenerator();
365  }
366
367  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
368    return this.connection.callerFactory.<T> masterRequest()
369      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
370      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
371      .pause(pauseNs, TimeUnit.NANOSECONDS)
372      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
373      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
374  }
375
376  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
377    return this.connection.callerFactory.<T> adminRequest()
378      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
379      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
380      .pause(pauseNs, TimeUnit.NANOSECONDS)
381      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
382      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
383  }
384
385  @FunctionalInterface
386  private interface MasterRpcCall<RESP, REQ> {
387    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
388      RpcCallback<RESP> done);
389  }
390
391  @FunctionalInterface
392  private interface AdminRpcCall<RESP, REQ> {
393    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
394      RpcCallback<RESP> done);
395  }
396
397  @FunctionalInterface
398  private interface Converter<D, S> {
399    D convert(S src) throws IOException;
400  }
401
402  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
403    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
404    Converter<RESP, PRESP> respConverter) {
405    CompletableFuture<RESP> future = new CompletableFuture<>();
406    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
407
408      @Override
409      public void run(PRESP resp) {
410        if (controller.failed()) {
411          future.completeExceptionally(controller.getFailed());
412        } else {
413          try {
414            future.complete(respConverter.convert(resp));
415          } catch (IOException e) {
416            future.completeExceptionally(e);
417          }
418        }
419      }
420    });
421    return future;
422  }
423
424  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
425    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
426    Converter<RESP, PRESP> respConverter) {
427    CompletableFuture<RESP> future = new CompletableFuture<>();
428    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
429
430      @Override
431      public void run(PRESP resp) {
432        if (controller.failed()) {
433          future.completeExceptionally(controller.getFailed());
434        } else {
435          try {
436            future.complete(respConverter.convert(resp));
437          } catch (IOException e) {
438            future.completeExceptionally(e);
439          }
440        }
441      }
442    });
443    return future;
444  }
445
446  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
447    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
448    ProcedureBiConsumer consumer) {
449    return procedureCall(b -> {
450    }, preq, rpcCall, respConverter, consumer);
451  }
452
453  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
454    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
455    ProcedureBiConsumer consumer) {
456    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
457  }
458
459  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
460    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
461    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
462    ProcedureBiConsumer consumer) {
463    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
464      stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
465    prioritySetter.accept(builder);
466    CompletableFuture<Long> procFuture = builder.call();
467    CompletableFuture<Void> future = waitProcedureResult(procFuture);
468    addListener(future, consumer);
469    return future;
470  }
471
472  @Override
473  public CompletableFuture<Boolean> tableExists(TableName tableName) {
474    if (TableName.isMetaTableName(tableName)) {
475      return CompletableFuture.completedFuture(true);
476    }
477    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
478  }
479
480  @Override
481  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
482    return getTableDescriptors(
483      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
484  }
485
486  /**
487   * {@link #listTableDescriptors(boolean)}
488   */
489  @Override
490  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
491    boolean includeSysTables) {
492    Preconditions.checkNotNull(pattern,
493      "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
494    return getTableDescriptors(
495      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
496  }
497
498  @Override
499  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
500    Preconditions.checkNotNull(tableNames,
501      "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead");
502    if (tableNames.isEmpty()) {
503      return CompletableFuture.completedFuture(Collections.emptyList());
504    }
505    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
506  }
507
508  private CompletableFuture<List<TableDescriptor>>
509    getTableDescriptors(GetTableDescriptorsRequest request) {
510    return this.<List<TableDescriptor>> newMasterCaller()
511      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
512        List<TableDescriptor>> call(controller, stub, request,
513          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
514          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
515      .call();
516  }
517
518  @Override
519  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
520    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
521  }
522
523  @Override
524  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
525    boolean includeSysTables) {
526    Preconditions.checkNotNull(pattern,
527      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
528    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
529  }
530
531  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
532    return this.<List<TableName>> newMasterCaller()
533      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
534        List<TableName>> call(controller, stub, request,
535          (s, c, req, done) -> s.getTableNames(c, req, done),
536          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
537      .call();
538  }
539
540  @Override
541  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
542    return this.<List<TableDescriptor>> newMasterCaller()
543      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
544        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
545          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
546          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
547          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
548      .call();
549  }
550
551  @Override
552  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
553    return this.<List<TableName>> newMasterCaller()
554      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
555        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
556          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
557          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
558          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
559      .call();
560  }
561
562  @Override
563  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
564    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
565    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
566      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
567        List<TableSchema>> call(controller, stub,
568          RequestConverter.buildGetTableDescriptorsRequest(tableName),
569          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
570          (resp) -> resp.getTableSchemaList()))
571      .call(), (tableSchemas, error) -> {
572        if (error != null) {
573          future.completeExceptionally(error);
574          return;
575        }
576        if (!tableSchemas.isEmpty()) {
577          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
578        } else {
579          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
580        }
581      });
582    return future;
583  }
584
585  @Override
586  public CompletableFuture<Void> createTable(TableDescriptor desc) {
587    return createTable(desc.getTableName(),
588      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
589  }
590
591  @Override
592  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
593    int numRegions) {
594    try {
595      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
596    } catch (IllegalArgumentException e) {
597      return failedFuture(e);
598    }
599  }
600
601  @Override
602  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
603    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
604      + " use createTable(TableDescriptor) instead");
605    try {
606      verifySplitKeys(splitKeys);
607      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
608        splitKeys, ng.getNonceGroup(), ng.newNonce()));
609    } catch (IllegalArgumentException e) {
610      return failedFuture(e);
611    }
612  }
613
614  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
615    Preconditions.checkNotNull(tableName, "table name is null");
616    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
617      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
618      new CreateTableProcedureBiConsumer(tableName));
619  }
620
621  @Override
622  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
623    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
624      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
625        ng.newNonce()),
626      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
627      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
628  }
629
630  @Override
631  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
632    return this.<ModifyTableStoreFileTrackerRequest,
633      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
634        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
635          ng.getNonceGroup(), ng.newNonce()),
636        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
637        (resp) -> resp.getProcId(),
638        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
639  }
640
641  @Override
642  public CompletableFuture<Void> deleteTable(TableName tableName) {
643    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
644      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
645      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
646      new DeleteTableProcedureBiConsumer(tableName));
647  }
648
649  @Override
650  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
651    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
652      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
653        ng.newNonce()),
654      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
655      new TruncateTableProcedureBiConsumer(tableName));
656  }
657
658  @Override
659  public CompletableFuture<Void> enableTable(TableName tableName) {
660    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
661      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
662      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
663      new EnableTableProcedureBiConsumer(tableName));
664  }
665
666  @Override
667  public CompletableFuture<Void> disableTable(TableName tableName) {
668    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
669      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
670      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
671      new DisableTableProcedureBiConsumer(tableName));
672  }
673
674  /**
675   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
676   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
677   * targetState).
678   */
679  private static CompletableFuture<Boolean> completeCheckTableState(
680    CompletableFuture<Boolean> future, TableState tableState, Throwable error,
681    TableState.State targetState, TableName tableName) {
682    if (error != null) {
683      future.completeExceptionally(error);
684    } else {
685      if (tableState != null) {
686        future.complete(tableState.inStates(targetState));
687      } else {
688        future.completeExceptionally(new TableNotFoundException(tableName));
689      }
690    }
691    return future;
692  }
693
694  @Override
695  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
696    if (TableName.isMetaTableName(tableName)) {
697      return CompletableFuture.completedFuture(true);
698    }
699    CompletableFuture<Boolean> future = new CompletableFuture<>();
700    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
701      completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
702        TableState.State.ENABLED, tableName);
703    });
704    return future;
705  }
706
707  @Override
708  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
709    if (TableName.isMetaTableName(tableName)) {
710      return CompletableFuture.completedFuture(false);
711    }
712    CompletableFuture<Boolean> future = new CompletableFuture<>();
713    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
714      completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
715        TableState.State.DISABLED, tableName);
716    });
717    return future;
718  }
719
720  @Override
721  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
722    return isTableAvailable(tableName, Optional.empty());
723  }
724
725  @Override
726  public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
727    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
728      + " use isTableAvailable(TableName) instead");
729    return isTableAvailable(tableName, Optional.of(splitKeys));
730  }
731
732  private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
733    Optional<byte[][]> splitKeys) {
734    if (TableName.isMetaTableName(tableName)) {
735      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
736        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
737    }
738    CompletableFuture<Boolean> future = new CompletableFuture<>();
739    addListener(isTableEnabled(tableName), (enabled, error) -> {
740      if (error != null) {
741        if (error instanceof TableNotFoundException) {
742          future.complete(false);
743        } else {
744          future.completeExceptionally(error);
745        }
746        return;
747      }
748      if (!enabled) {
749        future.complete(false);
750      } else {
751        addListener(AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
752          (locations, error1) -> {
753            if (error1 != null) {
754              future.completeExceptionally(error1);
755              return;
756            }
757            List<HRegionLocation> notDeployedRegions = locations.stream()
758              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
759            if (notDeployedRegions.size() > 0) {
760              if (LOG.isDebugEnabled()) {
761                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
762              }
763              future.complete(false);
764              return;
765            }
766
767            Optional<Boolean> available =
768              splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
769            future.complete(available.orElse(true));
770          });
771      }
772    });
773    return future;
774  }
775
776  private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) {
777    int regionCount = 0;
778    for (HRegionLocation location : locations) {
779      RegionInfo info = location.getRegion();
780      if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
781        regionCount++;
782        continue;
783      }
784      for (byte[] splitKey : splitKeys) {
785        // Just check if the splitkey is available
786        if (Bytes.equals(info.getStartKey(), splitKey)) {
787          regionCount++;
788          break;
789        }
790      }
791    }
792    return regionCount == splitKeys.length + 1;
793  }
794
795  @Override
796  public CompletableFuture<Void> addColumnFamily(TableName tableName,
797    ColumnFamilyDescriptor columnFamily) {
798    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
799      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
800        ng.newNonce()),
801      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
802      new AddColumnFamilyProcedureBiConsumer(tableName));
803  }
804
805  @Override
806  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
807    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
808      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
809        ng.newNonce()),
810      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
811      new DeleteColumnFamilyProcedureBiConsumer(tableName));
812  }
813
814  @Override
815  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
816    ColumnFamilyDescriptor columnFamily) {
817    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
818      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
819        ng.newNonce()),
820      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
821      new ModifyColumnFamilyProcedureBiConsumer(tableName));
822  }
823
824  @Override
825  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
826    byte[] family, String dstSFT) {
827    return this.<ModifyColumnStoreFileTrackerRequest,
828      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
829        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
830          ng.getNonceGroup(), ng.newNonce()),
831        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
832        (resp) -> resp.getProcId(),
833        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
834  }
835
836  @Override
837  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
838    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
839      RequestConverter.buildCreateNamespaceRequest(descriptor),
840      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
841      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
842  }
843
844  @Override
845  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
846    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
847      RequestConverter.buildModifyNamespaceRequest(descriptor),
848      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
849      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
850  }
851
852  @Override
853  public CompletableFuture<Void> deleteNamespace(String name) {
854    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
855      RequestConverter.buildDeleteNamespaceRequest(name),
856      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
857      new DeleteNamespaceProcedureBiConsumer(name));
858  }
859
860  @Override
861  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
862    return this.<NamespaceDescriptor> newMasterCaller()
863      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
864        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
865          RequestConverter.buildGetNamespaceDescriptorRequest(name),
866          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
867          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
868      .call();
869  }
870
871  @Override
872  public CompletableFuture<List<String>> listNamespaces() {
873    return this.<List<String>> newMasterCaller()
874      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
875        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
876          (s, c, req, done) -> s.listNamespaces(c, req, done),
877          (resp) -> resp.getNamespaceNameList()))
878      .call();
879  }
880
881  @Override
882  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
883    return this.<List<NamespaceDescriptor>> newMasterCaller()
884      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
885        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
886          ListNamespaceDescriptorsRequest.newBuilder().build(),
887          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
888          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
889      .call();
890  }
891
892  @Override
893  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
894    return this.<List<RegionInfo>> newAdminCaller()
895      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
896        List<RegionInfo>> adminCall(controller, stub,
897          RequestConverter.buildGetOnlineRegionRequest(),
898          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
899          resp -> ProtobufUtil.getRegionInfos(resp)))
900      .serverName(serverName).call();
901  }
902
903  @Override
904  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
905    if (tableName.equals(META_TABLE_NAME)) {
906      return connection.registry.getMetaRegionLocations()
907        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
908          .collect(Collectors.toList()));
909    } else {
910      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
911        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
912    }
913  }
914
915  @Override
916  public CompletableFuture<Void> flush(TableName tableName) {
917    return flush(tableName, null);
918  }
919
920  @Override
921  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
922    CompletableFuture<Void> future = new CompletableFuture<>();
923    addListener(tableExists(tableName), (exists, err) -> {
924      if (err != null) {
925        future.completeExceptionally(err);
926      } else if (!exists) {
927        future.completeExceptionally(new TableNotFoundException(tableName));
928      } else {
929        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
930          if (err2 != null) {
931            future.completeExceptionally(err2);
932          } else if (!tableEnabled) {
933            future.completeExceptionally(new TableNotEnabledException(tableName));
934          } else {
935            Map<String, String> props = new HashMap<>();
936            if (columnFamily != null) {
937              props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
938            }
939            addListener(
940              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
941              (ret, err3) -> {
942                if (err3 != null) {
943                  future.completeExceptionally(err3);
944                } else {
945                  future.complete(ret);
946                }
947              });
948          }
949        });
950      }
951    });
952    return future;
953  }
954
955  @Override
956  public CompletableFuture<Void> flushRegion(byte[] regionName) {
957    return flushRegion(regionName, null);
958  }
959
960  @Override
961  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
962    CompletableFuture<Void> future = new CompletableFuture<>();
963    addListener(getRegionLocation(regionName), (location, err) -> {
964      if (err != null) {
965        future.completeExceptionally(err);
966        return;
967      }
968      ServerName serverName = location.getServerName();
969      if (serverName == null) {
970        future
971          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
972        return;
973      }
974      addListener(flush(serverName, location.getRegion(), columnFamily), (ret, err2) -> {
975        if (err2 != null) {
976          future.completeExceptionally(err2);
977        } else {
978          future.complete(ret);
979        }
980      });
981    });
982    return future;
983  }
984
985  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo,
986    byte[] columnFamily) {
987    return this.<Void> newAdminCaller().serverName(serverName)
988      .action(
989        (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
990          Void> adminCall(controller, stub, RequestConverter
991            .buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, false),
992            (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null))
993      .call();
994  }
995
996  @Override
997  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
998    CompletableFuture<Void> future = new CompletableFuture<>();
999    addListener(getRegions(sn), (hRegionInfos, err) -> {
1000      if (err != null) {
1001        future.completeExceptionally(err);
1002        return;
1003      }
1004      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1005      if (hRegionInfos != null) {
1006        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, null)));
1007      }
1008      addListener(CompletableFuture.allOf(
1009        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1010          if (err2 != null) {
1011            future.completeExceptionally(err2);
1012          } else {
1013            future.complete(ret);
1014          }
1015        });
1016    });
1017    return future;
1018  }
1019
1020  @Override
1021  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1022    return compact(tableName, null, false, compactType);
1023  }
1024
1025  @Override
1026  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1027    CompactType compactType) {
1028    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1029      + "If you don't specify a columnFamily, use compact(TableName) instead");
1030    return compact(tableName, columnFamily, false, compactType);
1031  }
1032
1033  @Override
1034  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1035    return compactRegion(regionName, null, false);
1036  }
1037
1038  @Override
1039  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1040    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1041      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1042    return compactRegion(regionName, columnFamily, false);
1043  }
1044
1045  @Override
1046  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1047    return compact(tableName, null, true, compactType);
1048  }
1049
1050  @Override
1051  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1052    CompactType compactType) {
1053    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1054      + "If you don't specify a columnFamily, use compact(TableName) instead");
1055    return compact(tableName, columnFamily, true, compactType);
1056  }
1057
1058  @Override
1059  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1060    return compactRegion(regionName, null, true);
1061  }
1062
1063  @Override
1064  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1065    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1066      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1067    return compactRegion(regionName, columnFamily, true);
1068  }
1069
1070  @Override
1071  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1072    return compactRegionServer(sn, false);
1073  }
1074
1075  @Override
1076  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1077    return compactRegionServer(sn, true);
1078  }
1079
1080  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1081    CompletableFuture<Void> future = new CompletableFuture<>();
1082    addListener(getRegions(sn), (hRegionInfos, err) -> {
1083      if (err != null) {
1084        future.completeExceptionally(err);
1085        return;
1086      }
1087      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1088      if (hRegionInfos != null) {
1089        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1090      }
1091      addListener(CompletableFuture.allOf(
1092        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1093          if (err2 != null) {
1094            future.completeExceptionally(err2);
1095          } else {
1096            future.complete(ret);
1097          }
1098        });
1099    });
1100    return future;
1101  }
1102
1103  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1104    boolean major) {
1105    CompletableFuture<Void> future = new CompletableFuture<>();
1106    addListener(getRegionLocation(regionName), (location, err) -> {
1107      if (err != null) {
1108        future.completeExceptionally(err);
1109        return;
1110      }
1111      ServerName serverName = location.getServerName();
1112      if (serverName == null) {
1113        future
1114          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1115        return;
1116      }
1117      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1118        (ret, err2) -> {
1119          if (err2 != null) {
1120            future.completeExceptionally(err2);
1121          } else {
1122            future.complete(ret);
1123          }
1124        });
1125    });
1126    return future;
1127  }
1128
1129  /**
1130   * List all region locations for the specific table.
1131   */
1132  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1133    if (TableName.META_TABLE_NAME.equals(tableName)) {
1134      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1135      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1136        if (err != null) {
1137          future.completeExceptionally(err);
1138        } else if (
1139          metaRegions == null || metaRegions.isEmpty()
1140            || metaRegions.getDefaultRegionLocation() == null
1141        ) {
1142          future.completeExceptionally(new IOException("meta region does not found"));
1143        } else {
1144          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1145        }
1146      });
1147      return future;
1148    } else {
1149      // For non-meta table, we fetch all locations by scanning hbase:meta table
1150      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1151    }
1152  }
1153
1154  /**
1155   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1156   */
1157  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1158    CompactType compactType) {
1159    CompletableFuture<Void> future = new CompletableFuture<>();
1160
1161    switch (compactType) {
1162      case MOB:
1163        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1164          if (err != null) {
1165            future.completeExceptionally(err);
1166            return;
1167          }
1168          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1169          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1170            if (err2 != null) {
1171              future.completeExceptionally(err2);
1172            } else {
1173              future.complete(ret);
1174            }
1175          });
1176        });
1177        break;
1178      case NORMAL:
1179        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1180          if (err != null) {
1181            future.completeExceptionally(err);
1182            return;
1183          }
1184          if (locations == null || locations.isEmpty()) {
1185            future.completeExceptionally(new TableNotFoundException(tableName));
1186          }
1187          CompletableFuture<?>[] compactFutures =
1188            locations.stream().filter(l -> l.getRegion() != null)
1189              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1190              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1191              .toArray(CompletableFuture<?>[]::new);
1192          // future complete unless all of the compact futures are completed.
1193          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1194            if (err2 != null) {
1195              future.completeExceptionally(err2);
1196            } else {
1197              future.complete(ret);
1198            }
1199          });
1200        });
1201        break;
1202      default:
1203        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1204    }
1205    return future;
1206  }
1207
1208  /**
1209   * Compact the region at specific region server.
1210   */
1211  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1212    final boolean major, byte[] columnFamily) {
1213    return this.<Void> newAdminCaller().serverName(sn)
1214      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1215        Void> adminCall(controller, stub,
1216          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1217          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1218      .call();
1219  }
1220
1221  private byte[] toEncodeRegionName(byte[] regionName) {
1222    return RegionInfo.isEncodedRegionName(regionName)
1223      ? regionName
1224      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1225  }
1226
1227  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1228    CompletableFuture<TableName> result) {
1229    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1230      if (err != null) {
1231        result.completeExceptionally(err);
1232        return;
1233      }
1234      RegionInfo regionInfo = location.getRegion();
1235      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1236        result.completeExceptionally(
1237          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1238        return;
1239      }
1240      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1241        if (!tableName.get().equals(regionInfo.getTable())) {
1242          // tables of this two region should be same.
1243          result.completeExceptionally(
1244            new IllegalArgumentException("Cannot merge regions from two different tables "
1245              + tableName.get() + " and " + regionInfo.getTable()));
1246        } else {
1247          result.complete(tableName.get());
1248        }
1249      }
1250    });
1251  }
1252
1253  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1254    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1255    CompletableFuture<TableName> future = new CompletableFuture<>();
1256    for (byte[] encodedRegionName : encodedRegionNames) {
1257      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1258    }
1259    return future;
1260  }
1261
1262  @Override
1263  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1264    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1265  }
1266
1267  @Override
1268  public CompletableFuture<Boolean> isMergeEnabled() {
1269    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1270  }
1271
1272  @Override
1273  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1274    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1275  }
1276
1277  @Override
1278  public CompletableFuture<Boolean> isSplitEnabled() {
1279    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1280  }
1281
1282  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1283    MasterSwitchType switchType) {
1284    SetSplitOrMergeEnabledRequest request =
1285      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1286    return this.<Boolean> newMasterCaller()
1287      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1288        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1289          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1290          (resp) -> resp.getPrevValueList().get(0)))
1291      .call();
1292  }
1293
1294  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1295    IsSplitOrMergeEnabledRequest request =
1296      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1297    return this.<Boolean> newMasterCaller()
1298      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1299        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1300          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1301      .call();
1302  }
1303
1304  @Override
1305  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1306    if (nameOfRegionsToMerge.size() < 2) {
1307      return failedFuture(new IllegalArgumentException(
1308        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1309    }
1310    CompletableFuture<Void> future = new CompletableFuture<>();
1311    byte[][] encodedNameOfRegionsToMerge =
1312      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1313
1314    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1315      if (err != null) {
1316        future.completeExceptionally(err);
1317        return;
1318      }
1319
1320      final MergeTableRegionsRequest request;
1321      try {
1322        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1323          forcible, ng.getNonceGroup(), ng.newNonce());
1324      } catch (DeserializationException e) {
1325        future.completeExceptionally(e);
1326        return;
1327      }
1328
1329      addListener(
1330        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1331          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1332        (ret, err2) -> {
1333          if (err2 != null) {
1334            future.completeExceptionally(err2);
1335          } else {
1336            future.complete(ret);
1337          }
1338        });
1339    });
1340    return future;
1341  }
1342
1343  @Override
1344  public CompletableFuture<Void> split(TableName tableName) {
1345    CompletableFuture<Void> future = new CompletableFuture<>();
1346    addListener(tableExists(tableName), (exist, error) -> {
1347      if (error != null) {
1348        future.completeExceptionally(error);
1349        return;
1350      }
1351      if (!exist) {
1352        future.completeExceptionally(new TableNotFoundException(tableName));
1353        return;
1354      }
1355      addListener(
1356        metaTable
1357          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1358            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1359            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1360        (results, err2) -> {
1361          if (err2 != null) {
1362            future.completeExceptionally(err2);
1363            return;
1364          }
1365          if (results != null && !results.isEmpty()) {
1366            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1367            for (Result r : results) {
1368              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1369                continue;
1370              }
1371              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1372              if (rl != null) {
1373                for (HRegionLocation h : rl.getRegionLocations()) {
1374                  if (h != null && h.getServerName() != null) {
1375                    RegionInfo hri = h.getRegion();
1376                    if (
1377                      hri == null || hri.isSplitParent()
1378                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1379                    ) {
1380                      continue;
1381                    }
1382                    splitFutures.add(split(hri, null));
1383                  }
1384                }
1385              }
1386            }
1387            addListener(
1388              CompletableFuture
1389                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1390              (ret, exception) -> {
1391                if (exception != null) {
1392                  future.completeExceptionally(exception);
1393                  return;
1394                }
1395                future.complete(ret);
1396              });
1397          } else {
1398            future.complete(null);
1399          }
1400        });
1401    });
1402    return future;
1403  }
1404
1405  @Override
1406  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1407    CompletableFuture<Void> result = new CompletableFuture<>();
1408    if (splitPoint == null) {
1409      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1410    }
1411    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1412      (loc, err) -> {
1413        if (err != null) {
1414          result.completeExceptionally(err);
1415        } else if (loc == null || loc.getRegion() == null) {
1416          result.completeExceptionally(new IllegalArgumentException(
1417            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1418        } else {
1419          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1420            if (err2 != null) {
1421              result.completeExceptionally(err2);
1422            } else {
1423              result.complete(ret);
1424            }
1425
1426          });
1427        }
1428      });
1429    return result;
1430  }
1431
1432  @Override
1433  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1434    CompletableFuture<Void> future = new CompletableFuture<>();
1435    addListener(getRegionLocation(regionName), (location, err) -> {
1436      if (err != null) {
1437        future.completeExceptionally(err);
1438        return;
1439      }
1440      RegionInfo regionInfo = location.getRegion();
1441      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1442        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1443          + "Replicas are auto-split when their primary is split."));
1444        return;
1445      }
1446      ServerName serverName = location.getServerName();
1447      if (serverName == null) {
1448        future
1449          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1450        return;
1451      }
1452      addListener(split(regionInfo, null), (ret, err2) -> {
1453        if (err2 != null) {
1454          future.completeExceptionally(err2);
1455        } else {
1456          future.complete(ret);
1457        }
1458      });
1459    });
1460    return future;
1461  }
1462
1463  @Override
1464  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1465    Preconditions.checkNotNull(splitPoint,
1466      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1467    CompletableFuture<Void> future = new CompletableFuture<>();
1468    addListener(getRegionLocation(regionName), (location, err) -> {
1469      if (err != null) {
1470        future.completeExceptionally(err);
1471        return;
1472      }
1473      RegionInfo regionInfo = location.getRegion();
1474      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1475        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1476          + "Replicas are auto-split when their primary is split."));
1477        return;
1478      }
1479      ServerName serverName = location.getServerName();
1480      if (serverName == null) {
1481        future
1482          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1483        return;
1484      }
1485      if (
1486        regionInfo.getStartKey() != null
1487          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1488      ) {
1489        future.completeExceptionally(
1490          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1491        return;
1492      }
1493      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1494        if (err2 != null) {
1495          future.completeExceptionally(err2);
1496        } else {
1497          future.complete(ret);
1498        }
1499      });
1500    });
1501    return future;
1502  }
1503
1504  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1505    CompletableFuture<Void> future = new CompletableFuture<>();
1506    TableName tableName = hri.getTable();
1507    final SplitTableRegionRequest request;
1508    try {
1509      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1510        ng.newNonce());
1511    } catch (DeserializationException e) {
1512      future.completeExceptionally(e);
1513      return future;
1514    }
1515
1516    addListener(
1517      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1518        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1519      (ret, err2) -> {
1520        if (err2 != null) {
1521          future.completeExceptionally(err2);
1522        } else {
1523          future.complete(ret);
1524        }
1525      });
1526    return future;
1527  }
1528
1529  @Override
1530  public CompletableFuture<Void> assign(byte[] regionName) {
1531    CompletableFuture<Void> future = new CompletableFuture<>();
1532    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1533      if (err != null) {
1534        future.completeExceptionally(err);
1535        return;
1536      }
1537      addListener(
1538        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1539          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1540            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1541            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1542          .call(),
1543        (ret, err2) -> {
1544          if (err2 != null) {
1545            future.completeExceptionally(err2);
1546          } else {
1547            future.complete(ret);
1548          }
1549        });
1550    });
1551    return future;
1552  }
1553
1554  @Override
1555  public CompletableFuture<Void> unassign(byte[] regionName) {
1556    CompletableFuture<Void> future = new CompletableFuture<>();
1557    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1558      if (err != null) {
1559        future.completeExceptionally(err);
1560        return;
1561      }
1562      addListener(
1563        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1564          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1565            Void> call(controller, stub,
1566              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1567              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1568          .call(),
1569        (ret, err2) -> {
1570          if (err2 != null) {
1571            future.completeExceptionally(err2);
1572          } else {
1573            future.complete(ret);
1574          }
1575        });
1576    });
1577    return future;
1578  }
1579
1580  @Override
1581  public CompletableFuture<Void> offline(byte[] regionName) {
1582    CompletableFuture<Void> future = new CompletableFuture<>();
1583    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1584      if (err != null) {
1585        future.completeExceptionally(err);
1586        return;
1587      }
1588      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1589        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1590          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1591          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1592        .call(), (ret, err2) -> {
1593          if (err2 != null) {
1594            future.completeExceptionally(err2);
1595          } else {
1596            future.complete(ret);
1597          }
1598        });
1599    });
1600    return future;
1601  }
1602
1603  @Override
1604  public CompletableFuture<Void> move(byte[] regionName) {
1605    CompletableFuture<Void> future = new CompletableFuture<>();
1606    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1607      if (err != null) {
1608        future.completeExceptionally(err);
1609        return;
1610      }
1611      addListener(
1612        moveRegion(regionInfo,
1613          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1614        (ret, err2) -> {
1615          if (err2 != null) {
1616            future.completeExceptionally(err2);
1617          } else {
1618            future.complete(ret);
1619          }
1620        });
1621    });
1622    return future;
1623  }
1624
1625  @Override
1626  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1627    Preconditions.checkNotNull(destServerName,
1628      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1629    CompletableFuture<Void> future = new CompletableFuture<>();
1630    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1631      if (err != null) {
1632        future.completeExceptionally(err);
1633        return;
1634      }
1635      addListener(
1636        moveRegion(regionInfo, RequestConverter
1637          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1638        (ret, err2) -> {
1639          if (err2 != null) {
1640            future.completeExceptionally(err2);
1641          } else {
1642            future.complete(ret);
1643          }
1644        });
1645    });
1646    return future;
1647  }
1648
1649  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1650    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1651      .action(
1652        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1653          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1654      .call();
1655  }
1656
1657  @Override
1658  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1659    return this.<Void> newMasterCaller()
1660      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1661        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1662        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1663      .call();
1664  }
1665
1666  @Override
1667  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1668    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1669    Scan scan = QuotaTableUtil.makeScan(filter);
1670    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1671      new AdvancedScanResultConsumer() {
1672        List<QuotaSettings> settings = new ArrayList<>();
1673
1674        @Override
1675        public void onNext(Result[] results, ScanController controller) {
1676          for (Result result : results) {
1677            try {
1678              QuotaTableUtil.parseResultToCollection(result, settings);
1679            } catch (IOException e) {
1680              controller.terminate();
1681              future.completeExceptionally(e);
1682            }
1683          }
1684        }
1685
1686        @Override
1687        public void onError(Throwable error) {
1688          future.completeExceptionally(error);
1689        }
1690
1691        @Override
1692        public void onComplete() {
1693          future.complete(settings);
1694        }
1695      });
1696    return future;
1697  }
1698
1699  @Override
1700  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1701    boolean enabled) {
1702    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1703      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1704      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1705      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1706  }
1707
1708  @Override
1709  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1710    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1711      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1712      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1713      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1714  }
1715
1716  @Override
1717  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1718    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1719      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1720      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1721      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1722  }
1723
1724  @Override
1725  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1726    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1727      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1728      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1729      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1730  }
1731
1732  @Override
1733  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1734    return this.<ReplicationPeerConfig> newMasterCaller()
1735      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1736        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1737          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1738          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1739          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1740      .call();
1741  }
1742
1743  @Override
1744  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1745    ReplicationPeerConfig peerConfig) {
1746    return this.<UpdateReplicationPeerConfigRequest,
1747      UpdateReplicationPeerConfigResponse> procedureCall(
1748        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1749        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1750        (resp) -> resp.getProcId(),
1751        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1752  }
1753
1754  @Override
1755  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1756    Map<TableName, List<String>> tableCfs) {
1757    if (tableCfs == null) {
1758      return failedFuture(new ReplicationException("tableCfs is null"));
1759    }
1760
1761    CompletableFuture<Void> future = new CompletableFuture<Void>();
1762    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1763      if (!completeExceptionally(future, error)) {
1764        ReplicationPeerConfig newPeerConfig =
1765          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1766        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1767          if (!completeExceptionally(future, error)) {
1768            future.complete(result);
1769          }
1770        });
1771      }
1772    });
1773    return future;
1774  }
1775
1776  @Override
1777  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1778    Map<TableName, List<String>> tableCfs) {
1779    if (tableCfs == null) {
1780      return failedFuture(new ReplicationException("tableCfs is null"));
1781    }
1782
1783    CompletableFuture<Void> future = new CompletableFuture<Void>();
1784    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1785      if (!completeExceptionally(future, error)) {
1786        ReplicationPeerConfig newPeerConfig = null;
1787        try {
1788          newPeerConfig = ReplicationPeerConfigUtil
1789            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1790        } catch (ReplicationException e) {
1791          future.completeExceptionally(e);
1792          return;
1793        }
1794        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1795          if (!completeExceptionally(future, error)) {
1796            future.complete(result);
1797          }
1798        });
1799      }
1800    });
1801    return future;
1802  }
1803
1804  @Override
1805  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1806    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1807  }
1808
1809  @Override
1810  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1811    Preconditions.checkNotNull(pattern,
1812      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1813    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1814  }
1815
1816  private CompletableFuture<List<ReplicationPeerDescription>>
1817    listReplicationPeers(ListReplicationPeersRequest request) {
1818    return this.<List<ReplicationPeerDescription>> newMasterCaller()
1819      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1820        List<ReplicationPeerDescription>> call(controller, stub, request,
1821          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1822          (resp) -> resp.getPeerDescList().stream()
1823            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1824            .collect(Collectors.toList())))
1825      .call();
1826  }
1827
1828  @Override
1829  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1830    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1831    addListener(listTableDescriptors(), (tables, error) -> {
1832      if (!completeExceptionally(future, error)) {
1833        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1834        tables.forEach(table -> {
1835          Map<String, Integer> cfs = new HashMap<>();
1836          Stream.of(table.getColumnFamilies())
1837            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1838            .forEach(column -> {
1839              cfs.put(column.getNameAsString(), column.getScope());
1840            });
1841          if (!cfs.isEmpty()) {
1842            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1843          }
1844        });
1845        future.complete(replicatedTableCFs);
1846      }
1847    });
1848    return future;
1849  }
1850
1851  @Override
1852  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1853    SnapshotProtos.SnapshotDescription snapshot =
1854      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1855    try {
1856      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1857    } catch (IllegalArgumentException e) {
1858      return failedFuture(e);
1859    }
1860    CompletableFuture<Void> future = new CompletableFuture<>();
1861    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1862    addListener(this.<Long> newMasterCaller()
1863      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1864        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1865        resp -> resp.getExpectedTimeout()))
1866      .call(), (expectedTimeout, err) -> {
1867        if (err != null) {
1868          future.completeExceptionally(err);
1869          return;
1870        }
1871        TimerTask pollingTask = new TimerTask() {
1872          int tries = 0;
1873          long startTime = EnvironmentEdgeManager.currentTime();
1874          long endTime = startTime + expectedTimeout;
1875          long maxPauseTime = expectedTimeout / maxAttempts;
1876
1877          @Override
1878          public void run(Timeout timeout) throws Exception {
1879            if (EnvironmentEdgeManager.currentTime() < endTime) {
1880              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1881                if (err2 != null) {
1882                  future.completeExceptionally(err2);
1883                } else if (done) {
1884                  future.complete(null);
1885                } else {
1886                  // retry again after pauseTime.
1887                  long pauseTime =
1888                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1889                  pauseTime = Math.min(pauseTime, maxPauseTime);
1890                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1891                    TimeUnit.MILLISECONDS);
1892                }
1893              });
1894            } else {
1895              future.completeExceptionally(
1896                new SnapshotCreationException("Snapshot '" + snapshot.getName()
1897                  + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1898            }
1899          }
1900        };
1901        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1902      });
1903    return future;
1904  }
1905
1906  @Override
1907  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1908    return this.<Boolean> newMasterCaller()
1909      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
1910        Boolean> call(controller, stub,
1911          IsSnapshotDoneRequest.newBuilder()
1912            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
1913          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
1914      .call();
1915  }
1916
1917  @Override
1918  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1919    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1920      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1921      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1922    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1923  }
1924
1925  @Override
1926  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1927    boolean restoreAcl) {
1928    CompletableFuture<Void> future = new CompletableFuture<>();
1929    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1930      if (err != null) {
1931        future.completeExceptionally(err);
1932        return;
1933      }
1934      TableName tableName = null;
1935      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1936        for (SnapshotDescription snap : snapshotDescriptions) {
1937          if (snap.getName().equals(snapshotName)) {
1938            tableName = snap.getTableName();
1939            break;
1940          }
1941        }
1942      }
1943      if (tableName == null) {
1944        future.completeExceptionally(new RestoreSnapshotException(
1945          "Unable to find the table name for snapshot=" + snapshotName));
1946        return;
1947      }
1948      final TableName finalTableName = tableName;
1949      addListener(tableExists(finalTableName), (exists, err2) -> {
1950        if (err2 != null) {
1951          future.completeExceptionally(err2);
1952        } else if (!exists) {
1953          // if table does not exist, then just clone snapshot into new table.
1954          completeConditionalOnFuture(future,
1955            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
1956        } else {
1957          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1958            if (err4 != null) {
1959              future.completeExceptionally(err4);
1960            } else if (!disabled) {
1961              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1962            } else {
1963              completeConditionalOnFuture(future,
1964                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1965            }
1966          });
1967        }
1968      });
1969    });
1970    return future;
1971  }
1972
1973  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1974    boolean takeFailSafeSnapshot, boolean restoreAcl) {
1975    if (takeFailSafeSnapshot) {
1976      CompletableFuture<Void> future = new CompletableFuture<>();
1977      // Step.1 Take a snapshot of the current state
1978      String failSafeSnapshotSnapshotNameFormat =
1979        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1980          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1981      final String failSafeSnapshotSnapshotName =
1982        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1983          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1984          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1985      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1986      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1987        if (err != null) {
1988          future.completeExceptionally(err);
1989        } else {
1990          // Step.2 Restore snapshot
1991          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
1992            (void2, err2) -> {
1993              if (err2 != null) {
1994                // Step.3.a Something went wrong during the restore and try to rollback.
1995                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
1996                  restoreAcl, null), (void3, err3) -> {
1997                    if (err3 != null) {
1998                      future.completeExceptionally(err3);
1999                    } else {
2000                      String msg =
2001                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
2002                          + failSafeSnapshotSnapshotName + " succeeded.";
2003                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2004                    }
2005                  });
2006              } else {
2007                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2008                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2009                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2010                  if (err3 != null) {
2011                    LOG.error(
2012                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2013                      err3);
2014                    future.completeExceptionally(err3);
2015                  } else {
2016                    future.complete(ret3);
2017                  }
2018                });
2019              }
2020            });
2021        }
2022      });
2023      return future;
2024    } else {
2025      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2026    }
2027  }
2028
2029  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2030    CompletableFuture<T> parentFuture) {
2031    addListener(parentFuture, (res, err) -> {
2032      if (err != null) {
2033        dependentFuture.completeExceptionally(err);
2034      } else {
2035        dependentFuture.complete(res);
2036      }
2037    });
2038  }
2039
2040  @Override
2041  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2042    boolean restoreAcl, String customSFT) {
2043    CompletableFuture<Void> future = new CompletableFuture<>();
2044    addListener(tableExists(tableName), (exists, err) -> {
2045      if (err != null) {
2046        future.completeExceptionally(err);
2047      } else if (exists) {
2048        future.completeExceptionally(new TableExistsException(tableName));
2049      } else {
2050        completeConditionalOnFuture(future,
2051          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2052      }
2053    });
2054    return future;
2055  }
2056
2057  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2058    boolean restoreAcl, String customSFT) {
2059    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2060      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2061    try {
2062      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2063    } catch (IllegalArgumentException e) {
2064      return failedFuture(e);
2065    }
2066    RestoreSnapshotRequest.Builder builder =
2067      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2068        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2069    if (customSFT != null) {
2070      builder.setCustomSFT(customSFT);
2071    }
2072    return waitProcedureResult(this.<Long> newMasterCaller()
2073      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2074        Long> call(controller, stub, builder.build(),
2075          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2076      .call());
2077  }
2078
2079  @Override
2080  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2081    return getCompletedSnapshots(null);
2082  }
2083
2084  @Override
2085  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2086    Preconditions.checkNotNull(pattern,
2087      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2088    return getCompletedSnapshots(pattern);
2089  }
2090
2091  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2092    return this.<List<SnapshotDescription>> newMasterCaller()
2093      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2094        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2095          GetCompletedSnapshotsRequest.newBuilder().build(),
2096          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2097          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2098      .call();
2099  }
2100
2101  @Override
2102  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2103    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2104      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2105    return getCompletedSnapshots(tableNamePattern, null);
2106  }
2107
2108  @Override
2109  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2110    Pattern snapshotNamePattern) {
2111    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2112      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2113    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2114      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2115    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2116  }
2117
2118  private CompletableFuture<List<SnapshotDescription>>
2119    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2120    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2121    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2122      if (err != null) {
2123        future.completeExceptionally(err);
2124        return;
2125      }
2126      if (tableNames == null || tableNames.size() <= 0) {
2127        future.complete(Collections.emptyList());
2128        return;
2129      }
2130      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2131        if (err2 != null) {
2132          future.completeExceptionally(err2);
2133          return;
2134        }
2135        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2136          future.complete(Collections.emptyList());
2137          return;
2138        }
2139        future.complete(snapshotDescList.stream()
2140          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2141          .collect(Collectors.toList()));
2142      });
2143    });
2144    return future;
2145  }
2146
2147  @Override
2148  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2149    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2150  }
2151
2152  @Override
2153  public CompletableFuture<Void> deleteSnapshots() {
2154    return internalDeleteSnapshots(null, null);
2155  }
2156
2157  @Override
2158  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2159    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2160      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2161    return internalDeleteSnapshots(null, snapshotNamePattern);
2162  }
2163
2164  @Override
2165  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2166    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2167      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2168    return internalDeleteSnapshots(tableNamePattern, null);
2169  }
2170
2171  @Override
2172  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2173    Pattern snapshotNamePattern) {
2174    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2175      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2176    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2177      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2178    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2179  }
2180
2181  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2182    Pattern snapshotNamePattern) {
2183    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2184    if (tableNamePattern == null) {
2185      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2186    } else {
2187      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2188    }
2189    CompletableFuture<Void> future = new CompletableFuture<>();
2190    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2191      if (err != null) {
2192        future.completeExceptionally(err);
2193        return;
2194      }
2195      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2196        future.complete(null);
2197        return;
2198      }
2199      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2200        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2201          if (e != null) {
2202            future.completeExceptionally(e);
2203          } else {
2204            future.complete(v);
2205          }
2206        });
2207    });
2208    return future;
2209  }
2210
2211  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2212    return this.<Void> newMasterCaller()
2213      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2214        controller, stub,
2215        DeleteSnapshotRequest.newBuilder()
2216          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2217        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2218      .call();
2219  }
2220
2221  @Override
2222  public CompletableFuture<Void> execProcedure(String signature, String instance,
2223    Map<String, String> props) {
2224    CompletableFuture<Void> future = new CompletableFuture<>();
2225    ProcedureDescription procDesc =
2226      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2227    addListener(this.<Long> newMasterCaller()
2228      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2229        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2230        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2231      .call(), (expectedTimeout, err) -> {
2232        if (err != null) {
2233          future.completeExceptionally(err);
2234          return;
2235        }
2236        TimerTask pollingTask = new TimerTask() {
2237          int tries = 0;
2238          long startTime = EnvironmentEdgeManager.currentTime();
2239          long endTime = startTime + expectedTimeout;
2240          long maxPauseTime = expectedTimeout / maxAttempts;
2241
2242          @Override
2243          public void run(Timeout timeout) throws Exception {
2244            if (EnvironmentEdgeManager.currentTime() < endTime) {
2245              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2246                if (err2 != null) {
2247                  future.completeExceptionally(err2);
2248                  return;
2249                }
2250                if (done) {
2251                  future.complete(null);
2252                } else {
2253                  // retry again after pauseTime.
2254                  long pauseTime =
2255                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2256                  pauseTime = Math.min(pauseTime, maxPauseTime);
2257                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2258                    TimeUnit.MICROSECONDS);
2259                }
2260              });
2261            } else {
2262              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2263                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2264            }
2265          }
2266        };
2267        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2268        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2269      });
2270    return future;
2271  }
2272
2273  @Override
2274  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2275    Map<String, String> props) {
2276    ProcedureDescription proDesc =
2277      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2278    return this.<byte[]> newMasterCaller()
2279      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2280        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2281        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2282        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2283      .call();
2284  }
2285
2286  @Override
2287  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2288    Map<String, String> props) {
2289    ProcedureDescription proDesc =
2290      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2291    return this.<Boolean> newMasterCaller()
2292      .action(
2293        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2294          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2295          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2296      .call();
2297  }
2298
2299  @Override
2300  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2301    return this.<Boolean> newMasterCaller().action(
2302      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2303        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2304        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2305      .call();
2306  }
2307
2308  @Override
2309  public CompletableFuture<String> getProcedures() {
2310    return this.<String> newMasterCaller()
2311      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2312        controller, stub, GetProceduresRequest.newBuilder().build(),
2313        (s, c, req, done) -> s.getProcedures(c, req, done),
2314        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2315      .call();
2316  }
2317
2318  @Override
2319  public CompletableFuture<String> getLocks() {
2320    return this.<String> newMasterCaller()
2321      .action(
2322        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2323          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2324          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2325      .call();
2326  }
2327
2328  @Override
2329  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2330    boolean offload) {
2331    return this.<Void> newMasterCaller()
2332      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2333        DecommissionRegionServersResponse, Void> call(controller, stub,
2334          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2335          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2336      .call();
2337  }
2338
2339  @Override
2340  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2341    return this.<List<ServerName>> newMasterCaller()
2342      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2343        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2344          ListDecommissionedRegionServersRequest.newBuilder().build(),
2345          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2346          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2347            .collect(Collectors.toList())))
2348      .call();
2349  }
2350
2351  @Override
2352  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2353    List<byte[]> encodedRegionNames) {
2354    return this.<Void> newMasterCaller()
2355      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2356        RecommissionRegionServerResponse, Void> call(controller, stub,
2357          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2358          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2359      .call();
2360  }
2361
2362  /**
2363   * Get the region location for the passed region name. The region name may be a full region name
2364   * or encoded region name. If the region does not found, then it'll throw an
2365   * UnknownRegionException wrapped by a {@link CompletableFuture}
2366   * @param regionNameOrEncodedRegionName region name or encoded region name
2367   * @return region location, wrapped by a {@link CompletableFuture}
2368   */
2369  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2370    if (regionNameOrEncodedRegionName == null) {
2371      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2372    }
2373
2374    CompletableFuture<Optional<HRegionLocation>> future;
2375    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2376      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2377      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2378        // old format encodedName, should be meta region
2379        future = connection.registry.getMetaRegionLocations()
2380          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2381            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2382      } else {
2383        future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2384          regionNameOrEncodedRegionName);
2385      }
2386    } else {
2387      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2388      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2389      RegionInfo regionInfo;
2390      try {
2391        regionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2392      } catch (IOException ioe) {
2393        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2394      }
2395
2396      if (regionInfo.isMetaRegion()) {
2397        future = connection.registry.getMetaRegionLocations()
2398          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2399            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2400            .findFirst());
2401      } else {
2402        future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2403      }
2404    }
2405
2406    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2407    addListener(future, (location, err) -> {
2408      if (err != null) {
2409        returnedFuture.completeExceptionally(err);
2410        return;
2411      }
2412      if (!location.isPresent() || location.get().getRegion() == null) {
2413        returnedFuture.completeExceptionally(
2414          new UnknownRegionException("Invalid region name or encoded region name: "
2415            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2416      } else {
2417        returnedFuture.complete(location.get());
2418      }
2419    });
2420    return returnedFuture;
2421  }
2422
2423  /**
2424   * Get the region info for the passed region name. The region name may be a full region name or
2425   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2426   * wrapped by a {@link CompletableFuture}
2427   * @return region info, wrapped by a {@link CompletableFuture}
2428   */
2429  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2430    if (regionNameOrEncodedRegionName == null) {
2431      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2432    }
2433
2434    if (
2435      Bytes.equals(regionNameOrEncodedRegionName,
2436        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2437        || Bytes.equals(regionNameOrEncodedRegionName,
2438          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2439    ) {
2440      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2441    }
2442
2443    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2444    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2445      if (err != null) {
2446        future.completeExceptionally(err);
2447      } else {
2448        future.complete(location.getRegion());
2449      }
2450    });
2451    return future;
2452  }
2453
2454  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2455    if (numRegions < 3) {
2456      throw new IllegalArgumentException("Must create at least three regions");
2457    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2458      throw new IllegalArgumentException("Start key must be smaller than end key");
2459    }
2460    if (numRegions == 3) {
2461      return new byte[][] { startKey, endKey };
2462    }
2463    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2464    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2465      throw new IllegalArgumentException("Unable to split key range into enough regions");
2466    }
2467    return splitKeys;
2468  }
2469
2470  private void verifySplitKeys(byte[][] splitKeys) {
2471    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2472    // Verify there are no duplicate split keys
2473    byte[] lastKey = null;
2474    for (byte[] splitKey : splitKeys) {
2475      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2476        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2477      }
2478      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2479        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2480          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2481      }
2482      lastKey = splitKey;
2483    }
2484  }
2485
2486  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2487
2488    abstract void onFinished();
2489
2490    abstract void onError(Throwable error);
2491
2492    @Override
2493    public void accept(Void v, Throwable error) {
2494      if (error != null) {
2495        onError(error);
2496        return;
2497      }
2498      onFinished();
2499    }
2500  }
2501
2502  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2503    protected final TableName tableName;
2504
2505    TableProcedureBiConsumer(TableName tableName) {
2506      this.tableName = tableName;
2507    }
2508
2509    abstract String getOperationType();
2510
2511    String getDescription() {
2512      return "Operation: " + getOperationType() + ", " + "Table Name: "
2513        + tableName.getNameWithNamespaceInclAsString();
2514    }
2515
2516    @Override
2517    void onFinished() {
2518      LOG.info(getDescription() + " completed");
2519    }
2520
2521    @Override
2522    void onError(Throwable error) {
2523      LOG.info(getDescription() + " failed with " + error.getMessage());
2524    }
2525  }
2526
2527  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2528    protected final String namespaceName;
2529
2530    NamespaceProcedureBiConsumer(String namespaceName) {
2531      this.namespaceName = namespaceName;
2532    }
2533
2534    abstract String getOperationType();
2535
2536    String getDescription() {
2537      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2538    }
2539
2540    @Override
2541    void onFinished() {
2542      LOG.info(getDescription() + " completed");
2543    }
2544
2545    @Override
2546    void onError(Throwable error) {
2547      LOG.info(getDescription() + " failed with " + error.getMessage());
2548    }
2549  }
2550
2551  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2552
2553    CreateTableProcedureBiConsumer(TableName tableName) {
2554      super(tableName);
2555    }
2556
2557    @Override
2558    String getOperationType() {
2559      return "CREATE";
2560    }
2561  }
2562
2563  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2564
2565    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2566      super(tableName);
2567    }
2568
2569    @Override
2570    String getOperationType() {
2571      return "MODIFY";
2572    }
2573  }
2574
2575  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2576    extends TableProcedureBiConsumer {
2577
2578    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2579      super(tableName);
2580    }
2581
2582    @Override
2583    String getOperationType() {
2584      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2585    }
2586  }
2587
2588  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2589
2590    DeleteTableProcedureBiConsumer(TableName tableName) {
2591      super(tableName);
2592    }
2593
2594    @Override
2595    String getOperationType() {
2596      return "DELETE";
2597    }
2598
2599    @Override
2600    void onFinished() {
2601      connection.getLocator().clearCache(this.tableName);
2602      super.onFinished();
2603    }
2604  }
2605
2606  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2607
2608    TruncateTableProcedureBiConsumer(TableName tableName) {
2609      super(tableName);
2610    }
2611
2612    @Override
2613    String getOperationType() {
2614      return "TRUNCATE";
2615    }
2616  }
2617
2618  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2619
2620    EnableTableProcedureBiConsumer(TableName tableName) {
2621      super(tableName);
2622    }
2623
2624    @Override
2625    String getOperationType() {
2626      return "ENABLE";
2627    }
2628  }
2629
2630  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2631
2632    DisableTableProcedureBiConsumer(TableName tableName) {
2633      super(tableName);
2634    }
2635
2636    @Override
2637    String getOperationType() {
2638      return "DISABLE";
2639    }
2640  }
2641
2642  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2643
2644    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2645      super(tableName);
2646    }
2647
2648    @Override
2649    String getOperationType() {
2650      return "ADD_COLUMN_FAMILY";
2651    }
2652  }
2653
2654  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2655
2656    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2657      super(tableName);
2658    }
2659
2660    @Override
2661    String getOperationType() {
2662      return "DELETE_COLUMN_FAMILY";
2663    }
2664  }
2665
2666  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2667
2668    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2669      super(tableName);
2670    }
2671
2672    @Override
2673    String getOperationType() {
2674      return "MODIFY_COLUMN_FAMILY";
2675    }
2676  }
2677
2678  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2679    extends TableProcedureBiConsumer {
2680
2681    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2682      super(tableName);
2683    }
2684
2685    @Override
2686    String getOperationType() {
2687      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2688    }
2689  }
2690
2691  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2692
2693    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2694      super(namespaceName);
2695    }
2696
2697    @Override
2698    String getOperationType() {
2699      return "CREATE_NAMESPACE";
2700    }
2701  }
2702
2703  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2704
2705    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2706      super(namespaceName);
2707    }
2708
2709    @Override
2710    String getOperationType() {
2711      return "DELETE_NAMESPACE";
2712    }
2713  }
2714
2715  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2716
2717    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2718      super(namespaceName);
2719    }
2720
2721    @Override
2722    String getOperationType() {
2723      return "MODIFY_NAMESPACE";
2724    }
2725  }
2726
2727  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2728
2729    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2730      super(tableName);
2731    }
2732
2733    @Override
2734    String getOperationType() {
2735      return "MERGE_REGIONS";
2736    }
2737  }
2738
2739  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2740
2741    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2742      super(tableName);
2743    }
2744
2745    @Override
2746    String getOperationType() {
2747      return "SPLIT_REGION";
2748    }
2749  }
2750
2751  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2752    private final String peerId;
2753    private final Supplier<String> getOperation;
2754
2755    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2756      this.peerId = peerId;
2757      this.getOperation = getOperation;
2758    }
2759
2760    String getDescription() {
2761      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2762    }
2763
2764    @Override
2765    void onFinished() {
2766      LOG.info(getDescription() + " completed");
2767    }
2768
2769    @Override
2770    void onError(Throwable error) {
2771      LOG.info(getDescription() + " failed with " + error.getMessage());
2772    }
2773  }
2774
2775  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2776    CompletableFuture<Void> future = new CompletableFuture<>();
2777    addListener(procFuture, (procId, error) -> {
2778      if (error != null) {
2779        future.completeExceptionally(error);
2780        return;
2781      }
2782      getProcedureResult(procId, future, 0);
2783    });
2784    return future;
2785  }
2786
2787  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2788    addListener(
2789      this.<GetProcedureResultResponse> newMasterCaller()
2790        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
2791          GetProcedureResultResponse> call(controller, stub,
2792            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2793            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2794        .call(),
2795      (response, error) -> {
2796        if (error != null) {
2797          LOG.warn("failed to get the procedure result procId={}", procId,
2798            ConnectionUtils.translateException(error));
2799          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2800            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2801          return;
2802        }
2803        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2804          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2805            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2806          return;
2807        }
2808        if (response.hasException()) {
2809          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2810          future.completeExceptionally(ioe);
2811        } else {
2812          future.complete(null);
2813        }
2814      });
2815  }
2816
2817  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2818    CompletableFuture<T> future = new CompletableFuture<>();
2819    future.completeExceptionally(error);
2820    return future;
2821  }
2822
2823  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2824    if (error != null) {
2825      future.completeExceptionally(error);
2826      return true;
2827    }
2828    return false;
2829  }
2830
2831  @Override
2832  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2833    return getClusterMetrics(EnumSet.allOf(Option.class));
2834  }
2835
2836  @Override
2837  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2838    return this.<ClusterMetrics> newMasterCaller()
2839      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
2840        ClusterMetrics> call(controller, stub,
2841          RequestConverter.buildGetClusterStatusRequest(options),
2842          (s, c, req, done) -> s.getClusterStatus(c, req, done),
2843          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
2844      .call();
2845  }
2846
2847  @Override
2848  public CompletableFuture<Void> shutdown() {
2849    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2850      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2851        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2852        resp -> null))
2853      .call();
2854  }
2855
2856  @Override
2857  public CompletableFuture<Void> stopMaster() {
2858    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2859      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2860        controller, stub, StopMasterRequest.newBuilder().build(),
2861        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2862      .call();
2863  }
2864
2865  @Override
2866  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2867    StopServerRequest request = RequestConverter
2868      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2869    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2870      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2871        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2872        resp -> null))
2873      .serverName(serverName).call();
2874  }
2875
2876  @Override
2877  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2878    return this.<Void> newAdminCaller()
2879      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
2880        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2881          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2882      .serverName(serverName).call();
2883  }
2884
2885  @Override
2886  public CompletableFuture<Void> updateConfiguration() {
2887    CompletableFuture<Void> future = new CompletableFuture<Void>();
2888    addListener(
2889      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2890      (status, err) -> {
2891        if (err != null) {
2892          future.completeExceptionally(err);
2893        } else {
2894          List<CompletableFuture<Void>> futures = new ArrayList<>();
2895          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2896          futures.add(updateConfiguration(status.getMasterName()));
2897          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2898          addListener(
2899            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2900            (result, err2) -> {
2901              if (err2 != null) {
2902                future.completeExceptionally(err2);
2903              } else {
2904                future.complete(result);
2905              }
2906            });
2907        }
2908      });
2909    return future;
2910  }
2911
2912  @Override
2913  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2914    return this.<Void> newAdminCaller()
2915      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
2916        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
2917          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2918      .serverName(serverName).call();
2919  }
2920
2921  @Override
2922  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2923    return this.<Void> newAdminCaller()
2924      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
2925        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
2926          RequestConverter.buildClearCompactionQueuesRequest(queues),
2927          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2928      .serverName(serverName).call();
2929  }
2930
2931  @Override
2932  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2933    return this.<List<SecurityCapability>> newMasterCaller()
2934      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
2935        List<SecurityCapability>> call(controller, stub,
2936          SecurityCapabilitiesRequest.newBuilder().build(),
2937          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
2938          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
2939      .call();
2940  }
2941
2942  @Override
2943  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2944    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2945  }
2946
2947  @Override
2948  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2949    TableName tableName) {
2950    Preconditions.checkNotNull(tableName,
2951      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2952    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2953  }
2954
2955  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2956    ServerName serverName) {
2957    return this.<List<RegionMetrics>> newAdminCaller()
2958      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
2959        List<RegionMetrics>> adminCall(controller, stub, request,
2960          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
2961          RegionMetricsBuilder::toRegionMetrics))
2962      .serverName(serverName).call();
2963  }
2964
2965  @Override
2966  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2967    return this.<Boolean> newMasterCaller()
2968      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
2969        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
2970          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2971          resp -> resp.getInMaintenanceMode()))
2972      .call();
2973  }
2974
2975  @Override
2976  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2977    CompactType compactType) {
2978    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2979
2980    switch (compactType) {
2981      case MOB:
2982        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
2983          if (err != null) {
2984            future.completeExceptionally(err);
2985            return;
2986          }
2987          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2988
2989          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2990            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
2991              GetRegionInfoResponse> adminCall(controller, stub,
2992                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2993                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2994            .call(), (resp2, err2) -> {
2995              if (err2 != null) {
2996                future.completeExceptionally(err2);
2997              } else {
2998                if (resp2.hasCompactionState()) {
2999                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3000                } else {
3001                  future.complete(CompactionState.NONE);
3002                }
3003              }
3004            });
3005        });
3006        break;
3007      case NORMAL:
3008        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3009          if (err != null) {
3010            future.completeExceptionally(err);
3011            return;
3012          }
3013          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3014          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3015          locations.stream().filter(loc -> loc.getServerName() != null)
3016            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3017            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3018              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3019                // If any region compaction state is MAJOR_AND_MINOR
3020                // the table compaction state is MAJOR_AND_MINOR, too.
3021                if (err2 != null) {
3022                  future.completeExceptionally(unwrapCompletionException(err2));
3023                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3024                  future.complete(regionState);
3025                } else {
3026                  regionStates.add(regionState);
3027                }
3028              }));
3029            });
3030          addListener(
3031            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3032            (ret, err3) -> {
3033              // If future not completed, check all regions's compaction state
3034              if (!future.isCompletedExceptionally() && !future.isDone()) {
3035                CompactionState state = CompactionState.NONE;
3036                for (CompactionState regionState : regionStates) {
3037                  switch (regionState) {
3038                    case MAJOR:
3039                      if (state == CompactionState.MINOR) {
3040                        future.complete(CompactionState.MAJOR_AND_MINOR);
3041                      } else {
3042                        state = CompactionState.MAJOR;
3043                      }
3044                      break;
3045                    case MINOR:
3046                      if (state == CompactionState.MAJOR) {
3047                        future.complete(CompactionState.MAJOR_AND_MINOR);
3048                      } else {
3049                        state = CompactionState.MINOR;
3050                      }
3051                      break;
3052                    case NONE:
3053                    default:
3054                  }
3055                }
3056                if (!future.isDone()) {
3057                  future.complete(state);
3058                }
3059              }
3060            });
3061        });
3062        break;
3063      default:
3064        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3065    }
3066
3067    return future;
3068  }
3069
3070  @Override
3071  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3072    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3073    addListener(getRegionLocation(regionName), (location, err) -> {
3074      if (err != null) {
3075        future.completeExceptionally(err);
3076        return;
3077      }
3078      ServerName serverName = location.getServerName();
3079      if (serverName == null) {
3080        future
3081          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3082        return;
3083      }
3084      addListener(
3085        this.<GetRegionInfoResponse> newAdminCaller()
3086          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3087            GetRegionInfoResponse> adminCall(controller, stub,
3088              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3089                true),
3090              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3091          .serverName(serverName).call(),
3092        (resp2, err2) -> {
3093          if (err2 != null) {
3094            future.completeExceptionally(err2);
3095          } else {
3096            if (resp2.hasCompactionState()) {
3097              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3098            } else {
3099              future.complete(CompactionState.NONE);
3100            }
3101          }
3102        });
3103    });
3104    return future;
3105  }
3106
3107  @Override
3108  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3109    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3110      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3111    return this.<Optional<Long>> newMasterCaller()
3112      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3113        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3114          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3115          ProtobufUtil::toOptionalTimestamp))
3116      .call();
3117  }
3118
3119  @Override
3120  public CompletableFuture<Optional<Long>>
3121    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3122    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3123    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3124    addListener(getRegionInfo(regionName), (region, err) -> {
3125      if (err != null) {
3126        future.completeExceptionally(err);
3127        return;
3128      }
3129      MajorCompactionTimestampForRegionRequest.Builder builder =
3130        MajorCompactionTimestampForRegionRequest.newBuilder();
3131      builder.setRegion(
3132        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3133      addListener(this.<Optional<Long>> newMasterCaller()
3134        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3135          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3136            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3137            ProtobufUtil::toOptionalTimestamp))
3138        .call(), (timestamp, err2) -> {
3139          if (err2 != null) {
3140            future.completeExceptionally(err2);
3141          } else {
3142            future.complete(timestamp);
3143          }
3144        });
3145    });
3146    return future;
3147  }
3148
3149  @Override
3150  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3151    List<String> serverNamesList) {
3152    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3153    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3154      if (err != null) {
3155        future.completeExceptionally(err);
3156        return;
3157      }
3158      // Accessed by multiple threads.
3159      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3160      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3161      serverNames.stream().forEach(serverName -> {
3162        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3163          if (err2 != null) {
3164            future.completeExceptionally(unwrapCompletionException(err2));
3165          } else {
3166            serverStates.put(serverName, serverState);
3167          }
3168        }));
3169      });
3170      addListener(
3171        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3172        (ret, err3) -> {
3173          if (!future.isCompletedExceptionally()) {
3174            if (err3 != null) {
3175              future.completeExceptionally(err3);
3176            } else {
3177              future.complete(serverStates);
3178            }
3179          }
3180        });
3181    });
3182    return future;
3183  }
3184
3185  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3186    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3187    if (serverNamesList.isEmpty()) {
3188      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3189        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3190      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3191        if (err != null) {
3192          future.completeExceptionally(err);
3193        } else {
3194          future.complete(clusterMetrics.getServersName());
3195        }
3196      });
3197      return future;
3198    } else {
3199      List<ServerName> serverList = new ArrayList<>();
3200      for (String regionServerName : serverNamesList) {
3201        ServerName serverName = null;
3202        try {
3203          serverName = ServerName.valueOf(regionServerName);
3204        } catch (Exception e) {
3205          future.completeExceptionally(
3206            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3207        }
3208        if (serverName == null) {
3209          future.completeExceptionally(
3210            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3211        } else {
3212          serverList.add(serverName);
3213        }
3214      }
3215      future.complete(serverList);
3216    }
3217    return future;
3218  }
3219
3220  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3221    return this.<Boolean> newAdminCaller().serverName(serverName)
3222      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3223        Boolean> adminCall(controller, stub,
3224          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3225          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3226      .call();
3227  }
3228
3229  @Override
3230  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3231    return this.<Boolean> newMasterCaller()
3232      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3233        Boolean> call(controller, stub,
3234          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3235          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3236          (resp) -> resp.getPrevBalanceValue()))
3237      .call();
3238  }
3239
3240  @Override
3241  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3242    return this.<BalanceResponse> newMasterCaller()
3243      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3244        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3245          (s, c, req, done) -> s.balance(c, req, done),
3246          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3247      .call();
3248  }
3249
3250  @Override
3251  public CompletableFuture<Boolean> isBalancerEnabled() {
3252    return this.<Boolean> newMasterCaller()
3253      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3254        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3255          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3256      .call();
3257  }
3258
3259  @Override
3260  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3261    return this.<Boolean> newMasterCaller()
3262      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3263        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3264          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3265          (resp) -> resp.getPrevNormalizerValue()))
3266      .call();
3267  }
3268
3269  @Override
3270  public CompletableFuture<Boolean> isNormalizerEnabled() {
3271    return this.<Boolean> newMasterCaller()
3272      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3273        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3274          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3275      .call();
3276  }
3277
3278  @Override
3279  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3280    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3281  }
3282
3283  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3284    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3285      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3286  }
3287
3288  @Override
3289  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3290    return this.<Boolean> newMasterCaller()
3291      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3292        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3293          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3294          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3295          (resp) -> resp.getPrevValue()))
3296      .call();
3297  }
3298
3299  @Override
3300  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3301    return this.<Boolean> newMasterCaller()
3302      .action(
3303        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3304          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3305            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3306      .call();
3307  }
3308
3309  @Override
3310  public CompletableFuture<Boolean> runCleanerChore() {
3311    return this.<Boolean> newMasterCaller()
3312      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3313        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3314          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3315          (resp) -> resp.getCleanerChoreRan()))
3316      .call();
3317  }
3318
3319  @Override
3320  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3321    return this.<Boolean> newMasterCaller()
3322      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3323        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3324          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3325      .call();
3326  }
3327
3328  @Override
3329  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3330    return this.<Boolean> newMasterCaller()
3331      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3332        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3333          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3334          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3335      .call();
3336  }
3337
3338  @Override
3339  public CompletableFuture<Integer> runCatalogJanitor() {
3340    return this.<Integer> newMasterCaller()
3341      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3342        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3343          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3344      .call();
3345  }
3346
3347  @Override
3348  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3349    ServiceCaller<S, R> callable) {
3350    MasterCoprocessorRpcChannelImpl channel =
3351      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3352    S stub = stubMaker.apply(channel);
3353    CompletableFuture<R> future = new CompletableFuture<>();
3354    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3355    callable.call(stub, controller, resp -> {
3356      if (controller.failed()) {
3357        future.completeExceptionally(controller.getFailed());
3358      } else {
3359        future.complete(resp);
3360      }
3361    });
3362    return future;
3363  }
3364
3365  @Override
3366  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3367    ServiceCaller<S, R> callable, ServerName serverName) {
3368    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3369      this.<Message> newServerCaller().serverName(serverName));
3370    S stub = stubMaker.apply(channel);
3371    CompletableFuture<R> future = new CompletableFuture<>();
3372    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3373    callable.call(stub, controller, resp -> {
3374      if (controller.failed()) {
3375        future.completeExceptionally(controller.getFailed());
3376      } else {
3377        future.complete(resp);
3378      }
3379    });
3380    return future;
3381  }
3382
3383  @Override
3384  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3385    return this.<List<ServerName>> newMasterCaller()
3386      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3387        List<ServerName>> call(controller, stub,
3388          RequestConverter.buildClearDeadServersRequest(servers),
3389          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3390          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3391      .call();
3392  }
3393
3394  private <T> ServerRequestCallerBuilder<T> newServerCaller() {
3395    return this.connection.callerFactory.<T> serverRequest()
3396      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3397      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3398      .pause(pauseNs, TimeUnit.NANOSECONDS)
3399      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3400      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3401  }
3402
3403  @Override
3404  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3405    if (tableName == null) {
3406      return failedFuture(new IllegalArgumentException("Table name is null"));
3407    }
3408    CompletableFuture<Void> future = new CompletableFuture<>();
3409    addListener(tableExists(tableName), (exist, err) -> {
3410      if (err != null) {
3411        future.completeExceptionally(err);
3412        return;
3413      }
3414      if (!exist) {
3415        future.completeExceptionally(new TableNotFoundException(
3416          "Table '" + tableName.getNameAsString() + "' does not exists."));
3417        return;
3418      }
3419      addListener(getTableSplits(tableName), (splits, err1) -> {
3420        if (err1 != null) {
3421          future.completeExceptionally(err1);
3422        } else {
3423          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3424            if (err2 != null) {
3425              future.completeExceptionally(err2);
3426            } else {
3427              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3428                if (err3 != null) {
3429                  future.completeExceptionally(err3);
3430                } else {
3431                  future.complete(result3);
3432                }
3433              });
3434            }
3435          });
3436        }
3437      });
3438    });
3439    return future;
3440  }
3441
3442  @Override
3443  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3444    if (tableName == null) {
3445      return failedFuture(new IllegalArgumentException("Table name is null"));
3446    }
3447    CompletableFuture<Void> future = new CompletableFuture<>();
3448    addListener(tableExists(tableName), (exist, err) -> {
3449      if (err != null) {
3450        future.completeExceptionally(err);
3451        return;
3452      }
3453      if (!exist) {
3454        future.completeExceptionally(new TableNotFoundException(
3455          "Table '" + tableName.getNameAsString() + "' does not exists."));
3456        return;
3457      }
3458      addListener(setTableReplication(tableName, false), (result, err2) -> {
3459        if (err2 != null) {
3460          future.completeExceptionally(err2);
3461        } else {
3462          future.complete(result);
3463        }
3464      });
3465    });
3466    return future;
3467  }
3468
3469  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3470    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3471    addListener(
3472      getRegions(tableName).thenApply(regions -> regions.stream()
3473        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3474      (regions, err2) -> {
3475        if (err2 != null) {
3476          future.completeExceptionally(err2);
3477          return;
3478        }
3479        if (regions.size() == 1) {
3480          future.complete(null);
3481        } else {
3482          byte[][] splits = new byte[regions.size() - 1][];
3483          for (int i = 1; i < regions.size(); i++) {
3484            splits[i - 1] = regions.get(i).getStartKey();
3485          }
3486          future.complete(splits);
3487        }
3488      });
3489    return future;
3490  }
3491
3492  /**
3493   * Connect to peer and check the table descriptor on peer:
3494   * <ol>
3495   * <li>Create the same table on peer when not exist.</li>
3496   * <li>Throw an exception if the table already has replication enabled on any of the column
3497   * families.</li>
3498   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3499   * </ol>
3500   * @param tableName name of the table to sync to the peer
3501   * @param splits    table split keys
3502   */
3503  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3504    byte[][] splits) {
3505    CompletableFuture<Void> future = new CompletableFuture<>();
3506    addListener(listReplicationPeers(), (peers, err) -> {
3507      if (err != null) {
3508        future.completeExceptionally(err);
3509        return;
3510      }
3511      if (peers == null || peers.size() <= 0) {
3512        future.completeExceptionally(
3513          new IllegalArgumentException("Found no peer cluster for replication."));
3514        return;
3515      }
3516      List<CompletableFuture<Void>> futures = new ArrayList<>();
3517      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3518        .forEach(peer -> {
3519          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3520        });
3521      addListener(
3522        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3523        (result, err2) -> {
3524          if (err2 != null) {
3525            future.completeExceptionally(err2);
3526          } else {
3527            future.complete(result);
3528          }
3529        });
3530    });
3531    return future;
3532  }
3533
3534  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3535    ReplicationPeerDescription peer) {
3536    Configuration peerConf = null;
3537    try {
3538      peerConf =
3539        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3540    } catch (IOException e) {
3541      return failedFuture(e);
3542    }
3543    CompletableFuture<Void> future = new CompletableFuture<>();
3544    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3545      if (err != null) {
3546        future.completeExceptionally(err);
3547        return;
3548      }
3549      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3550        if (err1 != null) {
3551          future.completeExceptionally(err1);
3552          return;
3553        }
3554        AsyncAdmin peerAdmin = conn.getAdmin();
3555        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3556          if (err2 != null) {
3557            future.completeExceptionally(err2);
3558            return;
3559          }
3560          if (!exist) {
3561            CompletableFuture<Void> createTableFuture = null;
3562            if (splits == null) {
3563              createTableFuture = peerAdmin.createTable(tableDesc);
3564            } else {
3565              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3566            }
3567            addListener(createTableFuture, (result, err3) -> {
3568              if (err3 != null) {
3569                future.completeExceptionally(err3);
3570              } else {
3571                future.complete(result);
3572              }
3573            });
3574          } else {
3575            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3576              (result, err4) -> {
3577                if (err4 != null) {
3578                  future.completeExceptionally(err4);
3579                } else {
3580                  future.complete(result);
3581                }
3582              });
3583          }
3584        });
3585      });
3586    });
3587    return future;
3588  }
3589
3590  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3591    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3592    CompletableFuture<Void> future = new CompletableFuture<>();
3593    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3594      if (err != null) {
3595        future.completeExceptionally(err);
3596        return;
3597      }
3598      if (peerTableDesc == null) {
3599        future.completeExceptionally(
3600          new IllegalArgumentException("Failed to get table descriptor for table "
3601            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3602        return;
3603      }
3604      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3605        future.completeExceptionally(new IllegalArgumentException(
3606          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3607            + ", but the table descriptors are not same when compared with source cluster."
3608            + " Thus can not enable the table's replication switch."));
3609        return;
3610      }
3611      future.complete(null);
3612    });
3613    return future;
3614  }
3615
3616  /**
3617   * Set the table's replication switch if the table's replication switch is already not set.
3618   * @param tableName name of the table
3619   * @param enableRep is replication switch enable or disable
3620   */
3621  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3622    CompletableFuture<Void> future = new CompletableFuture<>();
3623    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3624      if (err != null) {
3625        future.completeExceptionally(err);
3626        return;
3627      }
3628      if (!tableDesc.matchReplicationScope(enableRep)) {
3629        int scope =
3630          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3631        TableDescriptor newTableDesc =
3632          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3633        addListener(modifyTable(newTableDesc), (result, err2) -> {
3634          if (err2 != null) {
3635            future.completeExceptionally(err2);
3636          } else {
3637            future.complete(result);
3638          }
3639        });
3640      } else {
3641        future.complete(null);
3642      }
3643    });
3644    return future;
3645  }
3646
3647  @Override
3648  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3649    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3650    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3651      if (err != null) {
3652        future.completeExceptionally(err);
3653        return;
3654      }
3655      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3656        locations.stream().filter(l -> l.getRegion() != null)
3657          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3658          .collect(Collectors.groupingBy(l -> l.getServerName(),
3659            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3660      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3661      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3662      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3663        futures
3664          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3665            if (err2 != null) {
3666              future.completeExceptionally(unwrapCompletionException(err2));
3667            } else {
3668              aggregator.append(stats);
3669            }
3670          }));
3671      }
3672      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3673        (ret, err3) -> {
3674          if (err3 != null) {
3675            future.completeExceptionally(unwrapCompletionException(err3));
3676          } else {
3677            future.complete(aggregator.sum());
3678          }
3679        });
3680    });
3681    return future;
3682  }
3683
3684  @Override
3685  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3686    boolean preserveSplits) {
3687    CompletableFuture<Void> future = new CompletableFuture<>();
3688    addListener(tableExists(tableName), (exist, err) -> {
3689      if (err != null) {
3690        future.completeExceptionally(err);
3691        return;
3692      }
3693      if (!exist) {
3694        future.completeExceptionally(new TableNotFoundException(tableName));
3695        return;
3696      }
3697      addListener(tableExists(newTableName), (exist1, err1) -> {
3698        if (err1 != null) {
3699          future.completeExceptionally(err1);
3700          return;
3701        }
3702        if (exist1) {
3703          future.completeExceptionally(new TableExistsException(newTableName));
3704          return;
3705        }
3706        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3707          if (err2 != null) {
3708            future.completeExceptionally(err2);
3709            return;
3710          }
3711          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3712          if (preserveSplits) {
3713            addListener(getTableSplits(tableName), (splits, err3) -> {
3714              if (err3 != null) {
3715                future.completeExceptionally(err3);
3716              } else {
3717                addListener(
3718                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3719                  (result, err4) -> {
3720                    if (err4 != null) {
3721                      future.completeExceptionally(err4);
3722                    } else {
3723                      future.complete(result);
3724                    }
3725                  });
3726              }
3727            });
3728          } else {
3729            addListener(createTable(newTableDesc), (result, err5) -> {
3730              if (err5 != null) {
3731                future.completeExceptionally(err5);
3732              } else {
3733                future.complete(result);
3734              }
3735            });
3736          }
3737        });
3738      });
3739    });
3740    return future;
3741  }
3742
3743  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3744    List<RegionInfo> hris) {
3745    return this.<CacheEvictionStats> newAdminCaller()
3746      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
3747        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
3748          RequestConverter.buildClearRegionBlockCacheRequest(hris),
3749          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3750          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3751      .serverName(serverName).call();
3752  }
3753
3754  @Override
3755  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3756    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3757      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
3758        Boolean> call(controller, stub,
3759          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3760          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3761          resp -> resp.getPreviousRpcThrottleEnabled()))
3762      .call();
3763    return future;
3764  }
3765
3766  @Override
3767  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3768    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3769      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
3770        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3771          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3772          resp -> resp.getRpcThrottleEnabled()))
3773      .call();
3774    return future;
3775  }
3776
3777  @Override
3778  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3779    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3780      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
3781        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
3782          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3783            .build(),
3784          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3785          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3786      .call();
3787    return future;
3788  }
3789
3790  @Override
3791  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3792    return this.<Map<TableName, Long>> newMasterCaller()
3793      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
3794        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
3795          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3796          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3797          resp -> resp.getSizesList().stream().collect(Collectors
3798            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3799      .call();
3800  }
3801
3802  @Override
3803  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
3804    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
3805    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3806      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
3807        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
3808          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3809          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3810          resp -> resp.getSnapshotsList().stream()
3811            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3812              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3813      .serverName(serverName).call();
3814  }
3815
3816  private CompletableFuture<SpaceQuotaSnapshot>
3817    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3818    return this.<SpaceQuotaSnapshot> newMasterCaller()
3819      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
3820        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
3821          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3822      .call();
3823  }
3824
3825  @Override
3826  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3827    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3828      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3829      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3830  }
3831
3832  @Override
3833  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3834    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3835    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3836      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3837      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3838  }
3839
3840  @Override
3841  public CompletableFuture<Void> grant(UserPermission userPermission,
3842    boolean mergeExistingPermissions) {
3843    return this.<Void> newMasterCaller()
3844      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
3845        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3846        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3847      .call();
3848  }
3849
3850  @Override
3851  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3852    return this.<Void> newMasterCaller()
3853      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3854        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3855        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3856      .call();
3857  }
3858
3859  @Override
3860  public CompletableFuture<List<UserPermission>>
3861    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3862    return this.<List<UserPermission>> newMasterCaller()
3863      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
3864        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
3865          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3866          (s, c, req, done) -> s.getUserPermissions(c, req, done),
3867          resp -> resp.getUserPermissionList().stream()
3868            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3869            .collect(Collectors.toList())))
3870      .call();
3871  }
3872
3873  @Override
3874  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3875    List<Permission> permissions) {
3876    return this.<List<Boolean>> newMasterCaller()
3877      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
3878        List<Boolean>> call(controller, stub,
3879          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3880          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3881          resp -> resp.getHasUserPermissionList()))
3882      .call();
3883  }
3884
3885  @Override
3886  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
3887    return this.<Boolean> newMasterCaller()
3888      .action((controller, stub) -> this.call(controller, stub,
3889        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3890        MasterService.Interface::switchSnapshotCleanup,
3891        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
3892      .call();
3893  }
3894
3895  @Override
3896  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3897    return this.<Boolean> newMasterCaller()
3898      .action((controller, stub) -> this.call(controller, stub,
3899        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3900        MasterService.Interface::isSnapshotCleanupEnabled,
3901        IsSnapshotCleanupEnabledResponse::getEnabled))
3902      .call();
3903  }
3904
3905  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
3906    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
3907    final String logType) {
3908    if (CollectionUtils.isEmpty(serverNames)) {
3909      return CompletableFuture.completedFuture(Collections.emptyList());
3910    }
3911    return CompletableFuture.supplyAsync(() -> serverNames
3912      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
3913        filterParams, limit, logType))
3914      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
3915  }
3916
3917  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
3918    Map<String, Object> filterParams, int limit, String logType) {
3919    return this.<List<LogEntry>> newAdminCaller()
3920      .action((controller, stub) -> this.adminCall(controller, stub,
3921        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
3922        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
3923      .serverName(serverName).call();
3924  }
3925
3926  @Override
3927  public CompletableFuture<List<Boolean>>
3928    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
3929    if (CollectionUtils.isEmpty(serverNames)) {
3930      return CompletableFuture.completedFuture(Collections.emptyList());
3931    }
3932    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
3933      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
3934    return convertToFutureOfList(clearSlowLogResponseList);
3935  }
3936
3937  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
3938    return this.<Boolean> newAdminCaller()
3939      .action((controller, stub) -> this.adminCall(controller, stub,
3940        RequestConverter.buildClearSlowLogResponseRequest(),
3941        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
3942      .serverName(serverName).call();
3943  }
3944
3945  private static <T> CompletableFuture<List<T>>
3946    convertToFutureOfList(List<CompletableFuture<T>> futures) {
3947    CompletableFuture<Void> allDoneFuture =
3948      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
3949    return allDoneFuture
3950      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
3951  }
3952
3953  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
3954    return this.<List<LogEntry>> newMasterCaller()
3955      .action((controller, stub) -> this.call(controller, stub,
3956        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
3957        ProtobufUtil::toBalancerDecisionResponse))
3958      .call();
3959  }
3960
3961  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
3962    return this.<List<LogEntry>> newMasterCaller()
3963      .action((controller, stub) -> this.call(controller, stub,
3964        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
3965        ProtobufUtil::toBalancerRejectionResponse))
3966      .call();
3967  }
3968
3969  @Override
3970  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
3971    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
3972    if (logType == null || serverType == null) {
3973      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
3974    }
3975    switch (logType) {
3976      case "SLOW_LOG":
3977      case "LARGE_LOG":
3978        if (ServerType.MASTER.equals(serverType)) {
3979          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
3980        }
3981        return getSlowLogResponses(filterParams, serverNames, limit, logType);
3982      case "BALANCER_DECISION":
3983        if (ServerType.REGION_SERVER.equals(serverType)) {
3984          throw new IllegalArgumentException(
3985            "Balancer Decision logs are not maintained by HRegionServer");
3986        }
3987        return getBalancerDecisions(limit);
3988      case "BALANCER_REJECTION":
3989        if (ServerType.REGION_SERVER.equals(serverType)) {
3990          throw new IllegalArgumentException(
3991            "Balancer Rejection logs are not maintained by HRegionServer");
3992        }
3993        return getBalancerRejections(limit);
3994      default:
3995        return CompletableFuture.completedFuture(Collections.emptyList());
3996    }
3997  }
3998
3999  @Override
4000  public CompletableFuture<Void> flushMasterStore() {
4001    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4002    return this.<Void> newMasterCaller()
4003      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4004        Void> call(controller, stub, request.build(),
4005          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4006      .call();
4007  }
4008}