001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiConsumer;
042import java.util.function.Consumer;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CacheEvictionStats;
050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
051import org.apache.hadoop.hbase.CatalogFamilyFormat;
052import org.apache.hadoop.hbase.ClientMetaTableAccessor;
053import org.apache.hadoop.hbase.ClusterMetrics;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.ClusterMetricsBuilder;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.NamespaceDescriptor;
059import org.apache.hadoop.hbase.RegionLocations;
060import org.apache.hadoop.hbase.RegionMetrics;
061import org.apache.hadoop.hbase.RegionMetricsBuilder;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableExistsException;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.TableNotDisabledException;
066import org.apache.hadoop.hbase.TableNotEnabledException;
067import org.apache.hadoop.hbase.TableNotFoundException;
068import org.apache.hadoop.hbase.UnknownRegionException;
069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.Scan.ReadType;
073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
074import org.apache.hadoop.hbase.client.replication.TableCFs;
075import org.apache.hadoop.hbase.client.security.SecurityCapability;
076import org.apache.hadoop.hbase.exceptions.DeserializationException;
077import org.apache.hadoop.hbase.ipc.HBaseRpcController;
078import org.apache.hadoop.hbase.net.Address;
079import org.apache.hadoop.hbase.quotas.QuotaFilter;
080import org.apache.hadoop.hbase.quotas.QuotaSettings;
081import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
082import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
083import org.apache.hadoop.hbase.replication.ReplicationException;
084import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
085import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
086import org.apache.hadoop.hbase.replication.SyncReplicationState;
087import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.yetus.audience.InterfaceAudience;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
104import org.apache.hbase.thirdparty.com.google.protobuf.Message;
105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
106import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
107import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
108import org.apache.hbase.thirdparty.io.netty.util.Timeout;
109import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
110import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
111
112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
113import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
337
338/**
339 * The implementation of AsyncAdmin.
340 * <p>
341 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
342 * be finished inside the rpc framework thread, which means that the callbacks registered to the
343 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
344 * this class should not try to do time consuming tasks in the callbacks.
345 * @since 2.0.0
346 * @see AsyncHBaseAdmin
347 * @see AsyncConnection#getAdmin()
348 * @see AsyncConnection#getAdminBuilder()
349 */
350@InterfaceAudience.Private
351class RawAsyncHBaseAdmin implements AsyncAdmin {
352
353  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
354
355  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
356
357  private final AsyncConnectionImpl connection;
358
359  private final HashedWheelTimer retryTimer;
360
361  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
362
363  private final long rpcTimeoutNs;
364
365  private final long operationTimeoutNs;
366
367  private final long pauseNs;
368
369  private final long pauseNsForServerOverloaded;
370
371  private final int maxAttempts;
372
373  private final int startLogErrorsCnt;
374
375  private final NonceGenerator ng;
376
377  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
378    AsyncAdminBuilderBase builder) {
379    this.connection = connection;
380    this.retryTimer = retryTimer;
381    this.metaTable = connection.getTable(META_TABLE_NAME);
382    this.rpcTimeoutNs = builder.rpcTimeoutNs;
383    this.operationTimeoutNs = builder.operationTimeoutNs;
384    this.pauseNs = builder.pauseNs;
385    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
386      LOG.warn(
387        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
388          + " the normal pause value {} ms, use the greater one instead",
389        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
390        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
391      this.pauseNsForServerOverloaded = builder.pauseNs;
392    } else {
393      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
394    }
395    this.maxAttempts = builder.maxAttempts;
396    this.startLogErrorsCnt = builder.startLogErrorsCnt;
397    this.ng = connection.getNonceGenerator();
398  }
399
400  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
401    return this.connection.callerFactory.<T> masterRequest()
402      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
403      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
404      .pause(pauseNs, TimeUnit.NANOSECONDS)
405      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
406      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
407  }
408
409  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
410    return this.connection.callerFactory.<T> adminRequest()
411      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
412      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
413      .pause(pauseNs, TimeUnit.NANOSECONDS)
414      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
415      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
416  }
417
418  @FunctionalInterface
419  private interface MasterRpcCall<RESP, REQ> {
420    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
421      RpcCallback<RESP> done);
422  }
423
424  @FunctionalInterface
425  private interface AdminRpcCall<RESP, REQ> {
426    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
427      RpcCallback<RESP> done);
428  }
429
430  @FunctionalInterface
431  private interface Converter<D, S> {
432    D convert(S src) throws IOException;
433  }
434
435  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
436    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
437    Converter<RESP, PRESP> respConverter) {
438    CompletableFuture<RESP> future = new CompletableFuture<>();
439    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
440
441      @Override
442      public void run(PRESP resp) {
443        if (controller.failed()) {
444          future.completeExceptionally(controller.getFailed());
445        } else {
446          try {
447            future.complete(respConverter.convert(resp));
448          } catch (IOException e) {
449            future.completeExceptionally(e);
450          }
451        }
452      }
453    });
454    return future;
455  }
456
457  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
458    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
459    Converter<RESP, PRESP> respConverter) {
460    CompletableFuture<RESP> future = new CompletableFuture<>();
461    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
462
463      @Override
464      public void run(PRESP resp) {
465        if (controller.failed()) {
466          future.completeExceptionally(controller.getFailed());
467        } else {
468          try {
469            future.complete(respConverter.convert(resp));
470          } catch (IOException e) {
471            future.completeExceptionally(e);
472          }
473        }
474      }
475    });
476    return future;
477  }
478
479  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
480    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
481    ProcedureBiConsumer consumer) {
482    return procedureCall(b -> {
483    }, preq, rpcCall, respConverter, consumer);
484  }
485
486  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
487    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
488    ProcedureBiConsumer consumer) {
489    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
490  }
491
492  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
493    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
494    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
495    ProcedureBiConsumer consumer) {
496    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
497      stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
498    prioritySetter.accept(builder);
499    CompletableFuture<Long> procFuture = builder.call();
500    CompletableFuture<Void> future = waitProcedureResult(procFuture);
501    addListener(future, consumer);
502    return future;
503  }
504
505  @FunctionalInterface
506  private interface TableOperator {
507    CompletableFuture<Void> operate(TableName table);
508  }
509
510  @Override
511  public CompletableFuture<Boolean> tableExists(TableName tableName) {
512    if (TableName.isMetaTableName(tableName)) {
513      return CompletableFuture.completedFuture(true);
514    }
515    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
516  }
517
518  @Override
519  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
520    return getTableDescriptors(
521      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
522  }
523
524  /**
525   * {@link #listTableDescriptors(boolean)}
526   */
527  @Override
528  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
529    boolean includeSysTables) {
530    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
531      + "use listTableDescriptors(boolean) instead");
532    return getTableDescriptors(
533      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
534  }
535
536  @Override
537  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
538    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
539      + "use listTableDescriptors(boolean) instead");
540    if (tableNames.isEmpty()) {
541      return CompletableFuture.completedFuture(Collections.emptyList());
542    }
543    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
544  }
545
546  private CompletableFuture<List<TableDescriptor>>
547    getTableDescriptors(GetTableDescriptorsRequest request) {
548    return this.<List<TableDescriptor>> newMasterCaller()
549      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
550        List<TableDescriptor>> call(controller, stub, request,
551          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
552          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
553      .call();
554  }
555
556  @Override
557  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
558    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
559  }
560
561  @Override
562  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
563    boolean includeSysTables) {
564    Preconditions.checkNotNull(pattern,
565      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
566    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
567  }
568
569  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
570    return this.<List<TableName>> newMasterCaller()
571      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
572        List<TableName>> call(controller, stub, request,
573          (s, c, req, done) -> s.getTableNames(c, req, done),
574          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
575      .call();
576  }
577
578  @Override
579  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
580    return this.<List<TableDescriptor>> newMasterCaller()
581      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
582        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
583          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
584          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
585          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
586      .call();
587  }
588
589  @Override
590  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
591    return this.<List<TableName>> newMasterCaller()
592      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
593        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
594          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
595          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
596          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
597      .call();
598  }
599
600  @Override
601  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
602    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
603    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
604      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
605        List<TableSchema>> call(controller, stub,
606          RequestConverter.buildGetTableDescriptorsRequest(tableName),
607          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
608          (resp) -> resp.getTableSchemaList()))
609      .call(), (tableSchemas, error) -> {
610        if (error != null) {
611          future.completeExceptionally(error);
612          return;
613        }
614        if (!tableSchemas.isEmpty()) {
615          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
616        } else {
617          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
618        }
619      });
620    return future;
621  }
622
623  @Override
624  public CompletableFuture<Void> createTable(TableDescriptor desc) {
625    return createTable(desc.getTableName(),
626      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
627  }
628
629  @Override
630  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
631    int numRegions) {
632    try {
633      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
634    } catch (IllegalArgumentException e) {
635      return failedFuture(e);
636    }
637  }
638
639  @Override
640  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
641    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
642      + " use createTable(TableDescriptor) instead");
643    try {
644      verifySplitKeys(splitKeys);
645      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
646        splitKeys, ng.getNonceGroup(), ng.newNonce()));
647    } catch (IllegalArgumentException e) {
648      return failedFuture(e);
649    }
650  }
651
652  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
653    Preconditions.checkNotNull(tableName, "table name is null");
654    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
655      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
656      new CreateTableProcedureBiConsumer(tableName));
657  }
658
659  @Override
660  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
661    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
662      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
663        ng.newNonce()),
664      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
665      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
666  }
667
668  @Override
669  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
670    return this.<ModifyTableStoreFileTrackerRequest,
671      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
672        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
673          ng.getNonceGroup(), ng.newNonce()),
674        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
675        (resp) -> resp.getProcId(),
676        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
677  }
678
679  @Override
680  public CompletableFuture<Void> deleteTable(TableName tableName) {
681    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
682      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
683      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
684      new DeleteTableProcedureBiConsumer(tableName));
685  }
686
687  @Override
688  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
689    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
690      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
691        ng.newNonce()),
692      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
693      new TruncateTableProcedureBiConsumer(tableName));
694  }
695
696  @Override
697  public CompletableFuture<Void> enableTable(TableName tableName) {
698    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
699      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
700      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
701      new EnableTableProcedureBiConsumer(tableName));
702  }
703
704  @Override
705  public CompletableFuture<Void> disableTable(TableName tableName) {
706    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
707      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
708      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
709      new DisableTableProcedureBiConsumer(tableName));
710  }
711
712  /**
713   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
714   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
715   * targetState).
716   */
717  private static CompletableFuture<Boolean> completeCheckTableState(
718    CompletableFuture<Boolean> future, TableState tableState, Throwable error,
719    TableState.State targetState, TableName tableName) {
720    if (error != null) {
721      future.completeExceptionally(error);
722    } else {
723      if (tableState != null) {
724        future.complete(tableState.inStates(targetState));
725      } else {
726        future.completeExceptionally(new TableNotFoundException(tableName));
727      }
728    }
729    return future;
730  }
731
732  @Override
733  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
734    if (TableName.isMetaTableName(tableName)) {
735      return CompletableFuture.completedFuture(true);
736    }
737    CompletableFuture<Boolean> future = new CompletableFuture<>();
738    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
739      (tableState, error) -> {
740        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
741          TableState.State.ENABLED, tableName);
742      });
743    return future;
744  }
745
746  @Override
747  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
748    if (TableName.isMetaTableName(tableName)) {
749      return CompletableFuture.completedFuture(false);
750    }
751    CompletableFuture<Boolean> future = new CompletableFuture<>();
752    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
753      (tableState, error) -> {
754        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
755          TableState.State.DISABLED, tableName);
756      });
757    return future;
758  }
759
760  @Override
761  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
762    if (TableName.isMetaTableName(tableName)) {
763      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
764        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
765    }
766    CompletableFuture<Boolean> future = new CompletableFuture<>();
767    addListener(isTableEnabled(tableName), (enabled, error) -> {
768      if (error != null) {
769        if (error instanceof TableNotFoundException) {
770          future.complete(false);
771        } else {
772          future.completeExceptionally(error);
773        }
774        return;
775      }
776      if (!enabled) {
777        future.complete(false);
778      } else {
779        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
780          (locations, error1) -> {
781            if (error1 != null) {
782              future.completeExceptionally(error1);
783              return;
784            }
785            List<HRegionLocation> notDeployedRegions = locations.stream()
786              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
787            if (notDeployedRegions.size() > 0) {
788              if (LOG.isDebugEnabled()) {
789                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
790              }
791              future.complete(false);
792              return;
793            }
794            future.complete(true);
795          });
796      }
797    });
798    return future;
799  }
800
801  @Override
802  public CompletableFuture<Void> addColumnFamily(TableName tableName,
803    ColumnFamilyDescriptor columnFamily) {
804    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
805      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
806        ng.newNonce()),
807      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
808      new AddColumnFamilyProcedureBiConsumer(tableName));
809  }
810
811  @Override
812  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
813    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
814      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
815        ng.newNonce()),
816      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
817      new DeleteColumnFamilyProcedureBiConsumer(tableName));
818  }
819
820  @Override
821  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
822    ColumnFamilyDescriptor columnFamily) {
823    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
824      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
825        ng.newNonce()),
826      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
827      new ModifyColumnFamilyProcedureBiConsumer(tableName));
828  }
829
830  @Override
831  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
832    byte[] family, String dstSFT) {
833    return this.<ModifyColumnStoreFileTrackerRequest,
834      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
835        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
836          ng.getNonceGroup(), ng.newNonce()),
837        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
838        (resp) -> resp.getProcId(),
839        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
840  }
841
842  @Override
843  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
844    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
845      RequestConverter.buildCreateNamespaceRequest(descriptor),
846      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
847      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
848  }
849
850  @Override
851  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
852    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
853      RequestConverter.buildModifyNamespaceRequest(descriptor),
854      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
855      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
856  }
857
858  @Override
859  public CompletableFuture<Void> deleteNamespace(String name) {
860    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
861      RequestConverter.buildDeleteNamespaceRequest(name),
862      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
863      new DeleteNamespaceProcedureBiConsumer(name));
864  }
865
866  @Override
867  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
868    return this.<NamespaceDescriptor> newMasterCaller()
869      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
870        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
871          RequestConverter.buildGetNamespaceDescriptorRequest(name),
872          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
873          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
874      .call();
875  }
876
877  @Override
878  public CompletableFuture<List<String>> listNamespaces() {
879    return this.<List<String>> newMasterCaller()
880      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
881        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
882          (s, c, req, done) -> s.listNamespaces(c, req, done),
883          (resp) -> resp.getNamespaceNameList()))
884      .call();
885  }
886
887  @Override
888  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
889    return this.<List<NamespaceDescriptor>> newMasterCaller()
890      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
891        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
892          ListNamespaceDescriptorsRequest.newBuilder().build(),
893          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
894          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
895      .call();
896  }
897
898  @Override
899  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
900    return this.<List<RegionInfo>> newAdminCaller()
901      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
902        List<RegionInfo>> adminCall(controller, stub,
903          RequestConverter.buildGetOnlineRegionRequest(),
904          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
905          resp -> ProtobufUtil.getRegionInfos(resp)))
906      .serverName(serverName).call();
907  }
908
909  @Override
910  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
911    if (tableName.equals(META_TABLE_NAME)) {
912      return connection.registry.getMetaRegionLocations()
913        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
914          .collect(Collectors.toList()));
915    } else {
916      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
917        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
918    }
919  }
920
921  @Override
922  public CompletableFuture<Void> flush(TableName tableName) {
923    return flush(tableName, null);
924  }
925
926  @Override
927  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
928    CompletableFuture<Void> future = new CompletableFuture<>();
929    addListener(tableExists(tableName), (exists, err) -> {
930      if (err != null) {
931        future.completeExceptionally(err);
932      } else if (!exists) {
933        future.completeExceptionally(new TableNotFoundException(tableName));
934      } else {
935        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
936          if (err2 != null) {
937            future.completeExceptionally(err2);
938          } else if (!tableEnabled) {
939            future.completeExceptionally(new TableNotEnabledException(tableName));
940          } else {
941            Map<String, String> props = new HashMap<>();
942            if (columnFamily != null) {
943              props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
944            }
945            addListener(
946              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
947              (ret, err3) -> {
948                if (err3 != null) {
949                  future.completeExceptionally(err3);
950                } else {
951                  future.complete(ret);
952                }
953              });
954          }
955        });
956      }
957    });
958    return future;
959  }
960
961  @Override
962  public CompletableFuture<Void> flushRegion(byte[] regionName) {
963    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
964    });
965  }
966
967  @Override
968  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
969    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
970      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
971    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
972    });
973  }
974
975  /**
976   * This method is for internal use only, where we need the response of the flush.
977   * <p/>
978   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
979   * API.
980   */
981  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
982    boolean writeFlushWALMarker) {
983    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
984    addListener(getRegionLocation(regionName), (location, err) -> {
985      if (err != null) {
986        future.completeExceptionally(err);
987        return;
988      }
989      ServerName serverName = location.getServerName();
990      if (serverName == null) {
991        future
992          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
993        return;
994      }
995      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
996        (ret, err2) -> {
997          if (err2 != null) {
998            future.completeExceptionally(err2);
999          } else {
1000            future.complete(ret);
1001          }
1002        });
1003    });
1004    return future;
1005  }
1006
1007  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1008    byte[] columnFamily, boolean writeFlushWALMarker) {
1009    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1010      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1011        FlushRegionResponse> adminCall(controller, stub,
1012          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1013            writeFlushWALMarker),
1014          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1015      .call();
1016  }
1017
1018  @Override
1019  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1020    CompletableFuture<Void> future = new CompletableFuture<>();
1021    addListener(getRegions(sn), (hRegionInfos, err) -> {
1022      if (err != null) {
1023        future.completeExceptionally(err);
1024        return;
1025      }
1026      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1027      if (hRegionInfos != null) {
1028        hRegionInfos
1029          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1030          })));
1031      }
1032      addListener(CompletableFuture.allOf(
1033        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1034          if (err2 != null) {
1035            future.completeExceptionally(err2);
1036          } else {
1037            future.complete(ret);
1038          }
1039        });
1040    });
1041    return future;
1042  }
1043
1044  @Override
1045  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1046    return compact(tableName, null, false, compactType);
1047  }
1048
1049  @Override
1050  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1051    CompactType compactType) {
1052    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1053      + "If you don't specify a columnFamily, use compact(TableName) instead");
1054    return compact(tableName, columnFamily, false, compactType);
1055  }
1056
1057  @Override
1058  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1059    return compactRegion(regionName, null, false);
1060  }
1061
1062  @Override
1063  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1064    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1065      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1066    return compactRegion(regionName, columnFamily, false);
1067  }
1068
1069  @Override
1070  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1071    return compact(tableName, null, true, compactType);
1072  }
1073
1074  @Override
1075  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1076    CompactType compactType) {
1077    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1078      + "If you don't specify a columnFamily, use compact(TableName) instead");
1079    return compact(tableName, columnFamily, true, compactType);
1080  }
1081
1082  @Override
1083  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1084    return compactRegion(regionName, null, true);
1085  }
1086
1087  @Override
1088  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1089    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1090      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1091    return compactRegion(regionName, columnFamily, true);
1092  }
1093
1094  @Override
1095  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1096    return compactRegionServer(sn, false);
1097  }
1098
1099  @Override
1100  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1101    return compactRegionServer(sn, true);
1102  }
1103
1104  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1105    CompletableFuture<Void> future = new CompletableFuture<>();
1106    addListener(getRegions(sn), (hRegionInfos, err) -> {
1107      if (err != null) {
1108        future.completeExceptionally(err);
1109        return;
1110      }
1111      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1112      if (hRegionInfos != null) {
1113        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1114      }
1115      addListener(CompletableFuture.allOf(
1116        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1117          if (err2 != null) {
1118            future.completeExceptionally(err2);
1119          } else {
1120            future.complete(ret);
1121          }
1122        });
1123    });
1124    return future;
1125  }
1126
1127  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1128    boolean major) {
1129    CompletableFuture<Void> future = new CompletableFuture<>();
1130    addListener(getRegionLocation(regionName), (location, err) -> {
1131      if (err != null) {
1132        future.completeExceptionally(err);
1133        return;
1134      }
1135      ServerName serverName = location.getServerName();
1136      if (serverName == null) {
1137        future
1138          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1139        return;
1140      }
1141      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1142        (ret, err2) -> {
1143          if (err2 != null) {
1144            future.completeExceptionally(err2);
1145          } else {
1146            future.complete(ret);
1147          }
1148        });
1149    });
1150    return future;
1151  }
1152
1153  /**
1154   * List all region locations for the specific table.
1155   */
1156  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1157    if (TableName.META_TABLE_NAME.equals(tableName)) {
1158      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1159      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1160        if (err != null) {
1161          future.completeExceptionally(err);
1162        } else if (
1163          metaRegions == null || metaRegions.isEmpty()
1164            || metaRegions.getDefaultRegionLocation() == null
1165        ) {
1166          future.completeExceptionally(new IOException("meta region does not found"));
1167        } else {
1168          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1169        }
1170      });
1171      return future;
1172    } else {
1173      // For non-meta table, we fetch all locations by scanning hbase:meta table
1174      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1175    }
1176  }
1177
1178  /**
1179   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1180   */
1181  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1182    CompactType compactType) {
1183    CompletableFuture<Void> future = new CompletableFuture<>();
1184
1185    switch (compactType) {
1186      case MOB:
1187        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1188          if (err != null) {
1189            future.completeExceptionally(err);
1190            return;
1191          }
1192          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1193          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1194            if (err2 != null) {
1195              future.completeExceptionally(err2);
1196            } else {
1197              future.complete(ret);
1198            }
1199          });
1200        });
1201        break;
1202      case NORMAL:
1203        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1204          if (err != null) {
1205            future.completeExceptionally(err);
1206            return;
1207          }
1208          if (locations == null || locations.isEmpty()) {
1209            future.completeExceptionally(new TableNotFoundException(tableName));
1210          }
1211          CompletableFuture<?>[] compactFutures =
1212            locations.stream().filter(l -> l.getRegion() != null)
1213              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1214              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1215              .toArray(CompletableFuture<?>[]::new);
1216          // future complete unless all of the compact futures are completed.
1217          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1218            if (err2 != null) {
1219              future.completeExceptionally(err2);
1220            } else {
1221              future.complete(ret);
1222            }
1223          });
1224        });
1225        break;
1226      default:
1227        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1228    }
1229    return future;
1230  }
1231
1232  /**
1233   * Compact the region at specific region server.
1234   */
1235  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1236    final boolean major, byte[] columnFamily) {
1237    return this.<Void> newAdminCaller().serverName(sn)
1238      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1239        Void> adminCall(controller, stub,
1240          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1241          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1242      .call();
1243  }
1244
1245  private byte[] toEncodeRegionName(byte[] regionName) {
1246    return RegionInfo.isEncodedRegionName(regionName)
1247      ? regionName
1248      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1249  }
1250
1251  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1252    CompletableFuture<TableName> result) {
1253    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1254      if (err != null) {
1255        result.completeExceptionally(err);
1256        return;
1257      }
1258      RegionInfo regionInfo = location.getRegion();
1259      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1260        result.completeExceptionally(
1261          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1262        return;
1263      }
1264      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1265        if (!tableName.get().equals(regionInfo.getTable())) {
1266          // tables of this two region should be same.
1267          result.completeExceptionally(
1268            new IllegalArgumentException("Cannot merge regions from two different tables "
1269              + tableName.get() + " and " + regionInfo.getTable()));
1270        } else {
1271          result.complete(tableName.get());
1272        }
1273      }
1274    });
1275  }
1276
1277  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1278    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1279    CompletableFuture<TableName> future = new CompletableFuture<>();
1280    for (byte[] encodedRegionName : encodedRegionNames) {
1281      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1282    }
1283    return future;
1284  }
1285
1286  @Override
1287  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1288    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1289  }
1290
1291  @Override
1292  public CompletableFuture<Boolean> isMergeEnabled() {
1293    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1294  }
1295
1296  @Override
1297  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1298    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1299  }
1300
1301  @Override
1302  public CompletableFuture<Boolean> isSplitEnabled() {
1303    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1304  }
1305
1306  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1307    MasterSwitchType switchType) {
1308    SetSplitOrMergeEnabledRequest request =
1309      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1310    return this.<Boolean> newMasterCaller()
1311      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1312        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1313          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1314          (resp) -> resp.getPrevValueList().get(0)))
1315      .call();
1316  }
1317
1318  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1319    IsSplitOrMergeEnabledRequest request =
1320      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1321    return this.<Boolean> newMasterCaller()
1322      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1323        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1324          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1325      .call();
1326  }
1327
1328  @Override
1329  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1330    if (nameOfRegionsToMerge.size() < 2) {
1331      return failedFuture(new IllegalArgumentException(
1332        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1333    }
1334    CompletableFuture<Void> future = new CompletableFuture<>();
1335    byte[][] encodedNameOfRegionsToMerge =
1336      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1337
1338    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1339      if (err != null) {
1340        future.completeExceptionally(err);
1341        return;
1342      }
1343
1344      final MergeTableRegionsRequest request;
1345      try {
1346        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1347          forcible, ng.getNonceGroup(), ng.newNonce());
1348      } catch (DeserializationException e) {
1349        future.completeExceptionally(e);
1350        return;
1351      }
1352
1353      addListener(
1354        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1355          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1356        (ret, err2) -> {
1357          if (err2 != null) {
1358            future.completeExceptionally(err2);
1359          } else {
1360            future.complete(ret);
1361          }
1362        });
1363    });
1364    return future;
1365  }
1366
1367  @Override
1368  public CompletableFuture<Void> split(TableName tableName) {
1369    CompletableFuture<Void> future = new CompletableFuture<>();
1370    addListener(tableExists(tableName), (exist, error) -> {
1371      if (error != null) {
1372        future.completeExceptionally(error);
1373        return;
1374      }
1375      if (!exist) {
1376        future.completeExceptionally(new TableNotFoundException(tableName));
1377        return;
1378      }
1379      addListener(metaTable
1380        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1381          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1382            ClientMetaTableAccessor.QueryType.REGION))
1383          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1384            ClientMetaTableAccessor.QueryType.REGION))),
1385        (results, err2) -> {
1386          if (err2 != null) {
1387            future.completeExceptionally(err2);
1388            return;
1389          }
1390          if (results != null && !results.isEmpty()) {
1391            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1392            for (Result r : results) {
1393              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1394                continue;
1395              }
1396              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1397              if (rl != null) {
1398                for (HRegionLocation h : rl.getRegionLocations()) {
1399                  if (h != null && h.getServerName() != null) {
1400                    RegionInfo hri = h.getRegion();
1401                    if (
1402                      hri == null || hri.isSplitParent()
1403                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1404                    ) {
1405                      continue;
1406                    }
1407                    splitFutures.add(split(hri, null));
1408                  }
1409                }
1410              }
1411            }
1412            addListener(
1413              CompletableFuture
1414                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1415              (ret, exception) -> {
1416                if (exception != null) {
1417                  future.completeExceptionally(exception);
1418                  return;
1419                }
1420                future.complete(ret);
1421              });
1422          } else {
1423            future.complete(null);
1424          }
1425        });
1426    });
1427    return future;
1428  }
1429
1430  @Override
1431  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1432    CompletableFuture<Void> result = new CompletableFuture<>();
1433    if (splitPoint == null) {
1434      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1435    }
1436    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1437      (loc, err) -> {
1438        if (err != null) {
1439          result.completeExceptionally(err);
1440        } else if (loc == null || loc.getRegion() == null) {
1441          result.completeExceptionally(new IllegalArgumentException(
1442            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1443        } else {
1444          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1445            if (err2 != null) {
1446              result.completeExceptionally(err2);
1447            } else {
1448              result.complete(ret);
1449            }
1450
1451          });
1452        }
1453      });
1454    return result;
1455  }
1456
1457  @Override
1458  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1459    CompletableFuture<Void> future = new CompletableFuture<>();
1460    addListener(getRegionLocation(regionName), (location, err) -> {
1461      if (err != null) {
1462        future.completeExceptionally(err);
1463        return;
1464      }
1465      RegionInfo regionInfo = location.getRegion();
1466      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1467        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1468          + "Replicas are auto-split when their primary is split."));
1469        return;
1470      }
1471      ServerName serverName = location.getServerName();
1472      if (serverName == null) {
1473        future
1474          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1475        return;
1476      }
1477      addListener(split(regionInfo, null), (ret, err2) -> {
1478        if (err2 != null) {
1479          future.completeExceptionally(err2);
1480        } else {
1481          future.complete(ret);
1482        }
1483      });
1484    });
1485    return future;
1486  }
1487
1488  @Override
1489  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1490    Preconditions.checkNotNull(splitPoint,
1491      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1492    CompletableFuture<Void> future = new CompletableFuture<>();
1493    addListener(getRegionLocation(regionName), (location, err) -> {
1494      if (err != null) {
1495        future.completeExceptionally(err);
1496        return;
1497      }
1498      RegionInfo regionInfo = location.getRegion();
1499      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1500        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1501          + "Replicas are auto-split when their primary is split."));
1502        return;
1503      }
1504      ServerName serverName = location.getServerName();
1505      if (serverName == null) {
1506        future
1507          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1508        return;
1509      }
1510      if (
1511        regionInfo.getStartKey() != null
1512          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1513      ) {
1514        future.completeExceptionally(
1515          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1516        return;
1517      }
1518      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1519        if (err2 != null) {
1520          future.completeExceptionally(err2);
1521        } else {
1522          future.complete(ret);
1523        }
1524      });
1525    });
1526    return future;
1527  }
1528
1529  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1530    CompletableFuture<Void> future = new CompletableFuture<>();
1531    TableName tableName = hri.getTable();
1532    final SplitTableRegionRequest request;
1533    try {
1534      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1535        ng.newNonce());
1536    } catch (DeserializationException e) {
1537      future.completeExceptionally(e);
1538      return future;
1539    }
1540
1541    addListener(
1542      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1543        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1544      (ret, err2) -> {
1545        if (err2 != null) {
1546          future.completeExceptionally(err2);
1547        } else {
1548          future.complete(ret);
1549        }
1550      });
1551    return future;
1552  }
1553
1554  @Override
1555  public CompletableFuture<Void> assign(byte[] regionName) {
1556    CompletableFuture<Void> future = new CompletableFuture<>();
1557    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1558      if (err != null) {
1559        future.completeExceptionally(err);
1560        return;
1561      }
1562      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1563        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1564          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1565          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1566        .call(), (ret, err2) -> {
1567          if (err2 != null) {
1568            future.completeExceptionally(err2);
1569          } else {
1570            future.complete(ret);
1571          }
1572        });
1573    });
1574    return future;
1575  }
1576
1577  @Override
1578  public CompletableFuture<Void> unassign(byte[] regionName) {
1579    CompletableFuture<Void> future = new CompletableFuture<>();
1580    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1581      if (err != null) {
1582        future.completeExceptionally(err);
1583        return;
1584      }
1585      addListener(
1586        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1587          .action(((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1588            Void> call(controller, stub,
1589              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1590              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1591          .call(),
1592        (ret, err2) -> {
1593          if (err2 != null) {
1594            future.completeExceptionally(err2);
1595          } else {
1596            future.complete(ret);
1597          }
1598        });
1599    });
1600    return future;
1601  }
1602
1603  @Override
1604  public CompletableFuture<Void> offline(byte[] regionName) {
1605    CompletableFuture<Void> future = new CompletableFuture<>();
1606    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1607      if (err != null) {
1608        future.completeExceptionally(err);
1609        return;
1610      }
1611      addListener(
1612        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1613          .action(((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse,
1614            Void> call(controller, stub,
1615              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1616              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1617          .call(),
1618        (ret, err2) -> {
1619          if (err2 != null) {
1620            future.completeExceptionally(err2);
1621          } else {
1622            future.complete(ret);
1623          }
1624        });
1625    });
1626    return future;
1627  }
1628
1629  @Override
1630  public CompletableFuture<Void> move(byte[] regionName) {
1631    CompletableFuture<Void> future = new CompletableFuture<>();
1632    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1633      if (err != null) {
1634        future.completeExceptionally(err);
1635        return;
1636      }
1637      addListener(
1638        moveRegion(regionInfo,
1639          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1640        (ret, err2) -> {
1641          if (err2 != null) {
1642            future.completeExceptionally(err2);
1643          } else {
1644            future.complete(ret);
1645          }
1646        });
1647    });
1648    return future;
1649  }
1650
1651  @Override
1652  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1653    Preconditions.checkNotNull(destServerName,
1654      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1655    CompletableFuture<Void> future = new CompletableFuture<>();
1656    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1657      if (err != null) {
1658        future.completeExceptionally(err);
1659        return;
1660      }
1661      addListener(
1662        moveRegion(regionInfo, RequestConverter
1663          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1664        (ret, err2) -> {
1665          if (err2 != null) {
1666            future.completeExceptionally(err2);
1667          } else {
1668            future.complete(ret);
1669          }
1670        });
1671    });
1672    return future;
1673  }
1674
1675  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1676    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1677      .action(
1678        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1679          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1680      .call();
1681  }
1682
1683  @Override
1684  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1685    return this.<Void> newMasterCaller()
1686      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1687        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1688        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1689      .call();
1690  }
1691
1692  @Override
1693  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1694    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1695    Scan scan = QuotaTableUtil.makeScan(filter);
1696    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1697      new AdvancedScanResultConsumer() {
1698        List<QuotaSettings> settings = new ArrayList<>();
1699
1700        @Override
1701        public void onNext(Result[] results, ScanController controller) {
1702          for (Result result : results) {
1703            try {
1704              QuotaTableUtil.parseResultToCollection(result, settings);
1705            } catch (IOException e) {
1706              controller.terminate();
1707              future.completeExceptionally(e);
1708            }
1709          }
1710        }
1711
1712        @Override
1713        public void onError(Throwable error) {
1714          future.completeExceptionally(error);
1715        }
1716
1717        @Override
1718        public void onComplete() {
1719          future.complete(settings);
1720        }
1721      });
1722    return future;
1723  }
1724
1725  @Override
1726  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1727    boolean enabled) {
1728    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1729      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1730      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1731      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1732  }
1733
1734  @Override
1735  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1736    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1737      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1738      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1739      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1740  }
1741
1742  @Override
1743  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1744    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1745      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1746      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1747      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1748  }
1749
1750  @Override
1751  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1752    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1753      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1754      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1755      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1756  }
1757
1758  @Override
1759  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1760    return this.<ReplicationPeerConfig> newMasterCaller()
1761      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1762        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1763          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1764          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1765          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1766      .call();
1767  }
1768
1769  @Override
1770  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1771    ReplicationPeerConfig peerConfig) {
1772    return this.<UpdateReplicationPeerConfigRequest,
1773      UpdateReplicationPeerConfigResponse> procedureCall(
1774        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1775        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1776        (resp) -> resp.getProcId(),
1777        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1778  }
1779
1780  @Override
1781  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1782    SyncReplicationState clusterState) {
1783    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1784      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1785        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1786          clusterState),
1787        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1788        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1789          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1790  }
1791
1792  @Override
1793  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1794    Map<TableName, List<String>> tableCfs) {
1795    if (tableCfs == null) {
1796      return failedFuture(new ReplicationException("tableCfs is null"));
1797    }
1798
1799    CompletableFuture<Void> future = new CompletableFuture<Void>();
1800    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1801      if (!completeExceptionally(future, error)) {
1802        ReplicationPeerConfig newPeerConfig =
1803          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1804        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1805          if (!completeExceptionally(future, error)) {
1806            future.complete(result);
1807          }
1808        });
1809      }
1810    });
1811    return future;
1812  }
1813
1814  @Override
1815  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1816    Map<TableName, List<String>> tableCfs) {
1817    if (tableCfs == null) {
1818      return failedFuture(new ReplicationException("tableCfs is null"));
1819    }
1820
1821    CompletableFuture<Void> future = new CompletableFuture<Void>();
1822    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1823      if (!completeExceptionally(future, error)) {
1824        ReplicationPeerConfig newPeerConfig = null;
1825        try {
1826          newPeerConfig = ReplicationPeerConfigUtil
1827            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1828        } catch (ReplicationException e) {
1829          future.completeExceptionally(e);
1830          return;
1831        }
1832        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1833          if (!completeExceptionally(future, error)) {
1834            future.complete(result);
1835          }
1836        });
1837      }
1838    });
1839    return future;
1840  }
1841
1842  @Override
1843  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1844    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1845  }
1846
1847  @Override
1848  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1849    Preconditions.checkNotNull(pattern,
1850      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1851    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1852  }
1853
1854  private CompletableFuture<List<ReplicationPeerDescription>>
1855    listReplicationPeers(ListReplicationPeersRequest request) {
1856    return this.<List<ReplicationPeerDescription>> newMasterCaller()
1857      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1858        List<ReplicationPeerDescription>> call(controller, stub, request,
1859          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1860          (resp) -> resp.getPeerDescList().stream()
1861            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1862            .collect(Collectors.toList())))
1863      .call();
1864  }
1865
1866  @Override
1867  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1868    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1869    addListener(listTableDescriptors(), (tables, error) -> {
1870      if (!completeExceptionally(future, error)) {
1871        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1872        tables.forEach(table -> {
1873          Map<String, Integer> cfs = new HashMap<>();
1874          Stream.of(table.getColumnFamilies())
1875            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1876            .forEach(column -> {
1877              cfs.put(column.getNameAsString(), column.getScope());
1878            });
1879          if (!cfs.isEmpty()) {
1880            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1881          }
1882        });
1883        future.complete(replicatedTableCFs);
1884      }
1885    });
1886    return future;
1887  }
1888
1889  @Override
1890  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1891    SnapshotProtos.SnapshotDescription snapshot =
1892      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1893    try {
1894      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1895    } catch (IllegalArgumentException e) {
1896      return failedFuture(e);
1897    }
1898    CompletableFuture<Void> future = new CompletableFuture<>();
1899    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
1900      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
1901    addListener(this.<SnapshotResponse> newMasterCaller()
1902      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
1903        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
1904      .call(), (resp, err) -> {
1905        if (err != null) {
1906          future.completeExceptionally(err);
1907          return;
1908        }
1909        waitSnapshotFinish(snapshotDesc, future, resp);
1910      });
1911    return future;
1912  }
1913
1914  // This is for keeping compatibility with old implementation.
1915  // If there is a procId field in the response, then the snapshot will be operated with a
1916  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
1917  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
1918    SnapshotResponse resp) {
1919    if (resp.hasProcId()) {
1920      getProcedureResult(resp.getProcId(), future, 0);
1921      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
1922    } else {
1923      long expectedTimeout = resp.getExpectedTimeout();
1924      TimerTask pollingTask = new TimerTask() {
1925        int tries = 0;
1926        long startTime = EnvironmentEdgeManager.currentTime();
1927        long endTime = startTime + expectedTimeout;
1928        long maxPauseTime = expectedTimeout / maxAttempts;
1929
1930        @Override
1931        public void run(Timeout timeout) throws Exception {
1932          if (EnvironmentEdgeManager.currentTime() < endTime) {
1933            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
1934              if (err2 != null) {
1935                future.completeExceptionally(err2);
1936              } else if (done) {
1937                future.complete(null);
1938              } else {
1939                // retry again after pauseTime.
1940                long pauseTime =
1941                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1942                pauseTime = Math.min(pauseTime, maxPauseTime);
1943                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
1944              }
1945            });
1946          } else {
1947            future
1948              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
1949                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
1950          }
1951        }
1952      };
1953      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1954    }
1955  }
1956
1957  @Override
1958  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1959    return this.<Boolean> newMasterCaller()
1960      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
1961        Boolean> call(controller, stub,
1962          IsSnapshotDoneRequest.newBuilder()
1963            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
1964          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
1965      .call();
1966  }
1967
1968  @Override
1969  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1970    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1971      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1972      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1973    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1974  }
1975
1976  @Override
1977  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1978    boolean restoreAcl) {
1979    CompletableFuture<Void> future = new CompletableFuture<>();
1980    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1981      if (err != null) {
1982        future.completeExceptionally(err);
1983        return;
1984      }
1985      TableName tableName = null;
1986      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1987        for (SnapshotDescription snap : snapshotDescriptions) {
1988          if (snap.getName().equals(snapshotName)) {
1989            tableName = snap.getTableName();
1990            break;
1991          }
1992        }
1993      }
1994      if (tableName == null) {
1995        future.completeExceptionally(new RestoreSnapshotException(
1996          "Unable to find the table name for snapshot=" + snapshotName));
1997        return;
1998      }
1999      final TableName finalTableName = tableName;
2000      addListener(tableExists(finalTableName), (exists, err2) -> {
2001        if (err2 != null) {
2002          future.completeExceptionally(err2);
2003        } else if (!exists) {
2004          // if table does not exist, then just clone snapshot into new table.
2005          completeConditionalOnFuture(future,
2006            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2007        } else {
2008          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2009            if (err4 != null) {
2010              future.completeExceptionally(err4);
2011            } else if (!disabled) {
2012              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2013            } else {
2014              completeConditionalOnFuture(future,
2015                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2016            }
2017          });
2018        }
2019      });
2020    });
2021    return future;
2022  }
2023
2024  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2025    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2026    if (takeFailSafeSnapshot) {
2027      CompletableFuture<Void> future = new CompletableFuture<>();
2028      // Step.1 Take a snapshot of the current state
2029      String failSafeSnapshotSnapshotNameFormat =
2030        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2031          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2032      final String failSafeSnapshotSnapshotName =
2033        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2034          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2035          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2036      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2037      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2038        if (err != null) {
2039          future.completeExceptionally(err);
2040        } else {
2041          // Step.2 Restore snapshot
2042          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2043            (void2, err2) -> {
2044              if (err2 != null) {
2045                // Step.3.a Something went wrong during the restore and try to rollback.
2046                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2047                  restoreAcl, null), (void3, err3) -> {
2048                    if (err3 != null) {
2049                      future.completeExceptionally(err3);
2050                    } else {
2051                      String msg =
2052                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
2053                          + failSafeSnapshotSnapshotName + " succeeded.";
2054                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2055                    }
2056                  });
2057              } else {
2058                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2059                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2060                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2061                  if (err3 != null) {
2062                    LOG.error(
2063                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2064                      err3);
2065                  }
2066                  future.complete(ret3);
2067                });
2068              }
2069            });
2070        }
2071      });
2072      return future;
2073    } else {
2074      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2075    }
2076  }
2077
2078  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2079    CompletableFuture<T> parentFuture) {
2080    addListener(parentFuture, (res, err) -> {
2081      if (err != null) {
2082        dependentFuture.completeExceptionally(err);
2083      } else {
2084        dependentFuture.complete(res);
2085      }
2086    });
2087  }
2088
2089  @Override
2090  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2091    boolean restoreAcl, String customSFT) {
2092    CompletableFuture<Void> future = new CompletableFuture<>();
2093    addListener(tableExists(tableName), (exists, err) -> {
2094      if (err != null) {
2095        future.completeExceptionally(err);
2096      } else if (exists) {
2097        future.completeExceptionally(new TableExistsException(tableName));
2098      } else {
2099        completeConditionalOnFuture(future,
2100          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2101      }
2102    });
2103    return future;
2104  }
2105
2106  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2107    boolean restoreAcl, String customSFT) {
2108    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2109      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2110    try {
2111      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2112    } catch (IllegalArgumentException e) {
2113      return failedFuture(e);
2114    }
2115    RestoreSnapshotRequest.Builder builder =
2116      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2117        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2118    if (customSFT != null) {
2119      builder.setCustomSFT(customSFT);
2120    }
2121    return waitProcedureResult(this.<Long> newMasterCaller()
2122      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2123        Long> call(controller, stub, builder.build(),
2124          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2125      .call());
2126  }
2127
2128  @Override
2129  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2130    return getCompletedSnapshots(null);
2131  }
2132
2133  @Override
2134  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2135    Preconditions.checkNotNull(pattern,
2136      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2137    return getCompletedSnapshots(pattern);
2138  }
2139
2140  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2141    return this.<List<SnapshotDescription>> newMasterCaller()
2142      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2143        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2144          GetCompletedSnapshotsRequest.newBuilder().build(),
2145          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2146          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2147      .call();
2148  }
2149
2150  @Override
2151  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2152    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2153      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2154    return getCompletedSnapshots(tableNamePattern, null);
2155  }
2156
2157  @Override
2158  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2159    Pattern snapshotNamePattern) {
2160    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2161      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2162    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2163      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2164    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2165  }
2166
2167  private CompletableFuture<List<SnapshotDescription>>
2168    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2169    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2170    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2171      if (err != null) {
2172        future.completeExceptionally(err);
2173        return;
2174      }
2175      if (tableNames == null || tableNames.size() <= 0) {
2176        future.complete(Collections.emptyList());
2177        return;
2178      }
2179      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2180        if (err2 != null) {
2181          future.completeExceptionally(err2);
2182          return;
2183        }
2184        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2185          future.complete(Collections.emptyList());
2186          return;
2187        }
2188        future.complete(snapshotDescList.stream()
2189          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2190          .collect(Collectors.toList()));
2191      });
2192    });
2193    return future;
2194  }
2195
2196  @Override
2197  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2198    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2199  }
2200
2201  @Override
2202  public CompletableFuture<Void> deleteSnapshots() {
2203    return internalDeleteSnapshots(null, null);
2204  }
2205
2206  @Override
2207  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2208    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2209      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2210    return internalDeleteSnapshots(null, snapshotNamePattern);
2211  }
2212
2213  @Override
2214  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2215    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2216      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2217    return internalDeleteSnapshots(tableNamePattern, null);
2218  }
2219
2220  @Override
2221  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2222    Pattern snapshotNamePattern) {
2223    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2224      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2225    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2226      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2227    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2228  }
2229
2230  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2231    Pattern snapshotNamePattern) {
2232    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2233    if (tableNamePattern == null) {
2234      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2235    } else {
2236      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2237    }
2238    CompletableFuture<Void> future = new CompletableFuture<>();
2239    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2240      if (err != null) {
2241        future.completeExceptionally(err);
2242        return;
2243      }
2244      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2245        future.complete(null);
2246        return;
2247      }
2248      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2249        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2250          if (e != null) {
2251            future.completeExceptionally(e);
2252          } else {
2253            future.complete(v);
2254          }
2255        });
2256    }));
2257    return future;
2258  }
2259
2260  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2261    return this.<Void> newMasterCaller()
2262      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2263        controller, stub,
2264        DeleteSnapshotRequest.newBuilder()
2265          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2266        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2267      .call();
2268  }
2269
2270  @Override
2271  public CompletableFuture<Void> execProcedure(String signature, String instance,
2272    Map<String, String> props) {
2273    CompletableFuture<Void> future = new CompletableFuture<>();
2274    ProcedureDescription procDesc =
2275      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2276    addListener(this.<Long> newMasterCaller()
2277      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2278        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2279        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2280      .call(), (expectedTimeout, err) -> {
2281        if (err != null) {
2282          future.completeExceptionally(err);
2283          return;
2284        }
2285        TimerTask pollingTask = new TimerTask() {
2286          int tries = 0;
2287          long startTime = EnvironmentEdgeManager.currentTime();
2288          long endTime = startTime + expectedTimeout;
2289          long maxPauseTime = expectedTimeout / maxAttempts;
2290
2291          @Override
2292          public void run(Timeout timeout) throws Exception {
2293            if (EnvironmentEdgeManager.currentTime() < endTime) {
2294              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2295                if (err2 != null) {
2296                  future.completeExceptionally(err2);
2297                  return;
2298                }
2299                if (done) {
2300                  future.complete(null);
2301                } else {
2302                  // retry again after pauseTime.
2303                  long pauseTime =
2304                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2305                  pauseTime = Math.min(pauseTime, maxPauseTime);
2306                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2307                    TimeUnit.MICROSECONDS);
2308                }
2309              });
2310            } else {
2311              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2312                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2313            }
2314          }
2315        };
2316        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2317        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2318      });
2319    return future;
2320  }
2321
2322  @Override
2323  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2324    Map<String, String> props) {
2325    ProcedureDescription proDesc =
2326      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2327    return this.<byte[]> newMasterCaller()
2328      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2329        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2330        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2331        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2332      .call();
2333  }
2334
2335  @Override
2336  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2337    Map<String, String> props) {
2338    ProcedureDescription proDesc =
2339      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2340    return this.<Boolean> newMasterCaller()
2341      .action(
2342        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2343          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2344          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2345      .call();
2346  }
2347
2348  @Override
2349  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2350    return this.<Boolean> newMasterCaller().action(
2351      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2352        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2353        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2354      .call();
2355  }
2356
2357  @Override
2358  public CompletableFuture<String> getProcedures() {
2359    return this.<String> newMasterCaller()
2360      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2361        controller, stub, GetProceduresRequest.newBuilder().build(),
2362        (s, c, req, done) -> s.getProcedures(c, req, done),
2363        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2364      .call();
2365  }
2366
2367  @Override
2368  public CompletableFuture<String> getLocks() {
2369    return this.<String> newMasterCaller()
2370      .action(
2371        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2372          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2373          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2374      .call();
2375  }
2376
2377  @Override
2378  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2379    boolean offload) {
2380    return this.<Void> newMasterCaller()
2381      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2382        DecommissionRegionServersResponse, Void> call(controller, stub,
2383          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2384          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2385      .call();
2386  }
2387
2388  @Override
2389  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2390    return this.<List<ServerName>> newMasterCaller()
2391      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2392        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2393          ListDecommissionedRegionServersRequest.newBuilder().build(),
2394          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2395          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2396            .collect(Collectors.toList())))
2397      .call();
2398  }
2399
2400  @Override
2401  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2402    List<byte[]> encodedRegionNames) {
2403    return this.<Void> newMasterCaller()
2404      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2405        RecommissionRegionServerResponse, Void> call(controller, stub,
2406          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2407          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2408      .call();
2409  }
2410
2411  /**
2412   * Get the region location for the passed region name. The region name may be a full region name
2413   * or encoded region name. If the region does not found, then it'll throw an
2414   * UnknownRegionException wrapped by a {@link CompletableFuture}
2415   * @param regionNameOrEncodedRegionName region name or encoded region name
2416   * @return region location, wrapped by a {@link CompletableFuture}
2417   */
2418  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2419    if (regionNameOrEncodedRegionName == null) {
2420      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2421    }
2422
2423    CompletableFuture<Optional<HRegionLocation>> future;
2424    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2425      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2426      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2427        // old format encodedName, should be meta region
2428        future = connection.registry.getMetaRegionLocations()
2429          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2430            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2431      } else {
2432        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2433          regionNameOrEncodedRegionName);
2434      }
2435    } else {
2436      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2437      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2438      RegionInfo regionInfo;
2439      try {
2440        regionInfo =
2441          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2442      } catch (IOException ioe) {
2443        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2444      }
2445
2446      if (regionInfo.isMetaRegion()) {
2447        future = connection.registry.getMetaRegionLocations()
2448          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2449            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2450            .findFirst());
2451      } else {
2452        future =
2453          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2454      }
2455    }
2456
2457    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2458    addListener(future, (location, err) -> {
2459      if (err != null) {
2460        returnedFuture.completeExceptionally(err);
2461        return;
2462      }
2463      if (!location.isPresent() || location.get().getRegion() == null) {
2464        returnedFuture.completeExceptionally(
2465          new UnknownRegionException("Invalid region name or encoded region name: "
2466            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2467      } else {
2468        returnedFuture.complete(location.get());
2469      }
2470    });
2471    return returnedFuture;
2472  }
2473
2474  /**
2475   * Get the region info for the passed region name. The region name may be a full region name or
2476   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2477   * wrapped by a {@link CompletableFuture}
2478   * @return region info, wrapped by a {@link CompletableFuture}
2479   */
2480  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2481    if (regionNameOrEncodedRegionName == null) {
2482      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2483    }
2484
2485    if (
2486      Bytes.equals(regionNameOrEncodedRegionName,
2487        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2488        || Bytes.equals(regionNameOrEncodedRegionName,
2489          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2490    ) {
2491      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2492    }
2493
2494    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2495    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2496      if (err != null) {
2497        future.completeExceptionally(err);
2498      } else {
2499        future.complete(location.getRegion());
2500      }
2501    });
2502    return future;
2503  }
2504
2505  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2506    if (numRegions < 3) {
2507      throw new IllegalArgumentException("Must create at least three regions");
2508    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2509      throw new IllegalArgumentException("Start key must be smaller than end key");
2510    }
2511    if (numRegions == 3) {
2512      return new byte[][] { startKey, endKey };
2513    }
2514    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2515    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2516      throw new IllegalArgumentException("Unable to split key range into enough regions");
2517    }
2518    return splitKeys;
2519  }
2520
2521  private void verifySplitKeys(byte[][] splitKeys) {
2522    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2523    // Verify there are no duplicate split keys
2524    byte[] lastKey = null;
2525    for (byte[] splitKey : splitKeys) {
2526      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2527        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2528      }
2529      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2530        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2531          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2532      }
2533      lastKey = splitKey;
2534    }
2535  }
2536
2537  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2538
2539    abstract void onFinished();
2540
2541    abstract void onError(Throwable error);
2542
2543    @Override
2544    public void accept(Void v, Throwable error) {
2545      if (error != null) {
2546        onError(error);
2547        return;
2548      }
2549      onFinished();
2550    }
2551  }
2552
2553  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2554    protected final TableName tableName;
2555
2556    TableProcedureBiConsumer(TableName tableName) {
2557      this.tableName = tableName;
2558    }
2559
2560    abstract String getOperationType();
2561
2562    String getDescription() {
2563      return "Operation: " + getOperationType() + ", " + "Table Name: "
2564        + tableName.getNameWithNamespaceInclAsString();
2565    }
2566
2567    @Override
2568    void onFinished() {
2569      LOG.info(getDescription() + " completed");
2570    }
2571
2572    @Override
2573    void onError(Throwable error) {
2574      LOG.info(getDescription() + " failed with " + error.getMessage());
2575    }
2576  }
2577
2578  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2579    protected final String namespaceName;
2580
2581    NamespaceProcedureBiConsumer(String namespaceName) {
2582      this.namespaceName = namespaceName;
2583    }
2584
2585    abstract String getOperationType();
2586
2587    String getDescription() {
2588      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2589    }
2590
2591    @Override
2592    void onFinished() {
2593      LOG.info(getDescription() + " completed");
2594    }
2595
2596    @Override
2597    void onError(Throwable error) {
2598      LOG.info(getDescription() + " failed with " + error.getMessage());
2599    }
2600  }
2601
2602  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2603
2604    CreateTableProcedureBiConsumer(TableName tableName) {
2605      super(tableName);
2606    }
2607
2608    @Override
2609    String getOperationType() {
2610      return "CREATE";
2611    }
2612  }
2613
2614  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2615
2616    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2617      super(tableName);
2618    }
2619
2620    @Override
2621    String getOperationType() {
2622      return "MODIFY";
2623    }
2624  }
2625
2626  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2627    extends TableProcedureBiConsumer {
2628
2629    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2630      super(tableName);
2631    }
2632
2633    @Override
2634    String getOperationType() {
2635      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2636    }
2637  }
2638
2639  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2640
2641    DeleteTableProcedureBiConsumer(TableName tableName) {
2642      super(tableName);
2643    }
2644
2645    @Override
2646    String getOperationType() {
2647      return "DELETE";
2648    }
2649
2650    @Override
2651    void onFinished() {
2652      connection.getLocator().clearCache(this.tableName);
2653      super.onFinished();
2654    }
2655  }
2656
2657  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2658
2659    TruncateTableProcedureBiConsumer(TableName tableName) {
2660      super(tableName);
2661    }
2662
2663    @Override
2664    String getOperationType() {
2665      return "TRUNCATE";
2666    }
2667  }
2668
2669  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2670
2671    EnableTableProcedureBiConsumer(TableName tableName) {
2672      super(tableName);
2673    }
2674
2675    @Override
2676    String getOperationType() {
2677      return "ENABLE";
2678    }
2679  }
2680
2681  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2682
2683    DisableTableProcedureBiConsumer(TableName tableName) {
2684      super(tableName);
2685    }
2686
2687    @Override
2688    String getOperationType() {
2689      return "DISABLE";
2690    }
2691  }
2692
2693  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2694
2695    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2696      super(tableName);
2697    }
2698
2699    @Override
2700    String getOperationType() {
2701      return "ADD_COLUMN_FAMILY";
2702    }
2703  }
2704
2705  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2706
2707    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2708      super(tableName);
2709    }
2710
2711    @Override
2712    String getOperationType() {
2713      return "DELETE_COLUMN_FAMILY";
2714    }
2715  }
2716
2717  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2718
2719    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2720      super(tableName);
2721    }
2722
2723    @Override
2724    String getOperationType() {
2725      return "MODIFY_COLUMN_FAMILY";
2726    }
2727  }
2728
2729  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2730    extends TableProcedureBiConsumer {
2731
2732    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2733      super(tableName);
2734    }
2735
2736    @Override
2737    String getOperationType() {
2738      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2739    }
2740  }
2741
2742  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2743
2744    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2745      super(namespaceName);
2746    }
2747
2748    @Override
2749    String getOperationType() {
2750      return "CREATE_NAMESPACE";
2751    }
2752  }
2753
2754  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2755
2756    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2757      super(namespaceName);
2758    }
2759
2760    @Override
2761    String getOperationType() {
2762      return "DELETE_NAMESPACE";
2763    }
2764  }
2765
2766  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2767
2768    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2769      super(namespaceName);
2770    }
2771
2772    @Override
2773    String getOperationType() {
2774      return "MODIFY_NAMESPACE";
2775    }
2776  }
2777
2778  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2779
2780    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2781      super(tableName);
2782    }
2783
2784    @Override
2785    String getOperationType() {
2786      return "MERGE_REGIONS";
2787    }
2788  }
2789
2790  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2791
2792    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2793      super(tableName);
2794    }
2795
2796    @Override
2797    String getOperationType() {
2798      return "SPLIT_REGION";
2799    }
2800  }
2801
2802  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2803    SnapshotProcedureBiConsumer(TableName tableName) {
2804      super(tableName);
2805    }
2806
2807    @Override
2808    String getOperationType() {
2809      return "SNAPSHOT";
2810    }
2811  }
2812
2813  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2814    private final String peerId;
2815    private final Supplier<String> getOperation;
2816
2817    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2818      this.peerId = peerId;
2819      this.getOperation = getOperation;
2820    }
2821
2822    String getDescription() {
2823      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2824    }
2825
2826    @Override
2827    void onFinished() {
2828      LOG.info(getDescription() + " completed");
2829    }
2830
2831    @Override
2832    void onError(Throwable error) {
2833      LOG.info(getDescription() + " failed with " + error.getMessage());
2834    }
2835  }
2836
2837  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2838    CompletableFuture<Void> future = new CompletableFuture<>();
2839    addListener(procFuture, (procId, error) -> {
2840      if (error != null) {
2841        future.completeExceptionally(error);
2842        return;
2843      }
2844      getProcedureResult(procId, future, 0);
2845    });
2846    return future;
2847  }
2848
2849  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2850    addListener(
2851      this.<GetProcedureResultResponse> newMasterCaller()
2852        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
2853          GetProcedureResultResponse> call(controller, stub,
2854            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2855            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2856        .call(),
2857      (response, error) -> {
2858        if (error != null) {
2859          LOG.warn("failed to get the procedure result procId={}", procId,
2860            ConnectionUtils.translateException(error));
2861          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2862            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2863          return;
2864        }
2865        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2866          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2867            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2868          return;
2869        }
2870        if (response.hasException()) {
2871          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2872          future.completeExceptionally(ioe);
2873        } else {
2874          future.complete(null);
2875        }
2876      });
2877  }
2878
2879  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2880    CompletableFuture<T> future = new CompletableFuture<>();
2881    future.completeExceptionally(error);
2882    return future;
2883  }
2884
2885  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2886    if (error != null) {
2887      future.completeExceptionally(error);
2888      return true;
2889    }
2890    return false;
2891  }
2892
2893  @Override
2894  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2895    return getClusterMetrics(EnumSet.allOf(Option.class));
2896  }
2897
2898  @Override
2899  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2900    return this.<ClusterMetrics> newMasterCaller()
2901      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
2902        ClusterMetrics> call(controller, stub,
2903          RequestConverter.buildGetClusterStatusRequest(options),
2904          (s, c, req, done) -> s.getClusterStatus(c, req, done),
2905          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
2906      .call();
2907  }
2908
2909  @Override
2910  public CompletableFuture<Void> shutdown() {
2911    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2912      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2913        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2914        resp -> null))
2915      .call();
2916  }
2917
2918  @Override
2919  public CompletableFuture<Void> stopMaster() {
2920    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2921      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2922        controller, stub, StopMasterRequest.newBuilder().build(),
2923        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2924      .call();
2925  }
2926
2927  @Override
2928  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2929    StopServerRequest request = RequestConverter
2930      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2931    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2932      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2933        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2934        resp -> null))
2935      .serverName(serverName).call();
2936  }
2937
2938  @Override
2939  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2940    return this.<Void> newAdminCaller()
2941      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
2942        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2943          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2944      .serverName(serverName).call();
2945  }
2946
2947  @Override
2948  public CompletableFuture<Void> updateConfiguration() {
2949    CompletableFuture<Void> future = new CompletableFuture<Void>();
2950    addListener(
2951      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2952      (status, err) -> {
2953        if (err != null) {
2954          future.completeExceptionally(err);
2955        } else {
2956          List<CompletableFuture<Void>> futures = new ArrayList<>();
2957          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2958          futures.add(updateConfiguration(status.getMasterName()));
2959          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2960          addListener(
2961            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2962            (result, err2) -> {
2963              if (err2 != null) {
2964                future.completeExceptionally(err2);
2965              } else {
2966                future.complete(result);
2967              }
2968            });
2969        }
2970      });
2971    return future;
2972  }
2973
2974  @Override
2975  public CompletableFuture<Void> updateConfiguration(String groupName) {
2976    CompletableFuture<Void> future = new CompletableFuture<Void>();
2977    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
2978      if (err != null) {
2979        future.completeExceptionally(err);
2980      } else if (rsGroupInfo == null) {
2981        future.completeExceptionally(
2982          new IllegalArgumentException("Group does not exist: " + groupName));
2983      } else {
2984        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
2985          if (err2 != null) {
2986            future.completeExceptionally(err2);
2987          } else {
2988            List<CompletableFuture<Void>> futures = new ArrayList<>();
2989            List<ServerName> groupServers = status.getServersName().stream()
2990              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
2991            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
2992            addListener(
2993              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2994              (result, err3) -> {
2995                if (err3 != null) {
2996                  future.completeExceptionally(err3);
2997                } else {
2998                  future.complete(result);
2999                }
3000              });
3001          }
3002        });
3003      }
3004    });
3005    return future;
3006  }
3007
3008  @Override
3009  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3010    return this.<Void> newAdminCaller()
3011      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3012        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3013          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3014      .serverName(serverName).call();
3015  }
3016
3017  @Override
3018  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3019    return this.<Void> newAdminCaller()
3020      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3021        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3022          RequestConverter.buildClearCompactionQueuesRequest(queues),
3023          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3024      .serverName(serverName).call();
3025  }
3026
3027  @Override
3028  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3029    return this.<List<SecurityCapability>> newMasterCaller()
3030      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3031        List<SecurityCapability>> call(controller, stub,
3032          SecurityCapabilitiesRequest.newBuilder().build(),
3033          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3034          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3035      .call();
3036  }
3037
3038  @Override
3039  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3040    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3041  }
3042
3043  @Override
3044  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3045    TableName tableName) {
3046    Preconditions.checkNotNull(tableName,
3047      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3048    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3049  }
3050
3051  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3052    ServerName serverName) {
3053    return this.<List<RegionMetrics>> newAdminCaller()
3054      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3055        List<RegionMetrics>> adminCall(controller, stub, request,
3056          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3057          RegionMetricsBuilder::toRegionMetrics))
3058      .serverName(serverName).call();
3059  }
3060
3061  @Override
3062  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3063    return this.<Boolean> newMasterCaller()
3064      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3065        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3066          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3067          resp -> resp.getInMaintenanceMode()))
3068      .call();
3069  }
3070
3071  @Override
3072  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3073    CompactType compactType) {
3074    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3075
3076    switch (compactType) {
3077      case MOB:
3078        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3079          if (err != null) {
3080            future.completeExceptionally(err);
3081            return;
3082          }
3083          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3084
3085          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3086            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3087              GetRegionInfoResponse> adminCall(controller, stub,
3088                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3089                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3090            .call(), (resp2, err2) -> {
3091              if (err2 != null) {
3092                future.completeExceptionally(err2);
3093              } else {
3094                if (resp2.hasCompactionState()) {
3095                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3096                } else {
3097                  future.complete(CompactionState.NONE);
3098                }
3099              }
3100            });
3101        });
3102        break;
3103      case NORMAL:
3104        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3105          if (err != null) {
3106            future.completeExceptionally(err);
3107            return;
3108          }
3109          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3110          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3111          locations.stream().filter(loc -> loc.getServerName() != null)
3112            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3113            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3114              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3115                // If any region compaction state is MAJOR_AND_MINOR
3116                // the table compaction state is MAJOR_AND_MINOR, too.
3117                if (err2 != null) {
3118                  future.completeExceptionally(unwrapCompletionException(err2));
3119                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3120                  future.complete(regionState);
3121                } else {
3122                  regionStates.add(regionState);
3123                }
3124              }));
3125            });
3126          addListener(
3127            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3128            (ret, err3) -> {
3129              // If future not completed, check all regions's compaction state
3130              if (!future.isCompletedExceptionally() && !future.isDone()) {
3131                CompactionState state = CompactionState.NONE;
3132                for (CompactionState regionState : regionStates) {
3133                  switch (regionState) {
3134                    case MAJOR:
3135                      if (state == CompactionState.MINOR) {
3136                        future.complete(CompactionState.MAJOR_AND_MINOR);
3137                      } else {
3138                        state = CompactionState.MAJOR;
3139                      }
3140                      break;
3141                    case MINOR:
3142                      if (state == CompactionState.MAJOR) {
3143                        future.complete(CompactionState.MAJOR_AND_MINOR);
3144                      } else {
3145                        state = CompactionState.MINOR;
3146                      }
3147                      break;
3148                    case NONE:
3149                    default:
3150                  }
3151                }
3152                if (!future.isDone()) {
3153                  future.complete(state);
3154                }
3155              }
3156            });
3157        });
3158        break;
3159      default:
3160        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3161    }
3162
3163    return future;
3164  }
3165
3166  @Override
3167  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3168    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3169    addListener(getRegionLocation(regionName), (location, err) -> {
3170      if (err != null) {
3171        future.completeExceptionally(err);
3172        return;
3173      }
3174      ServerName serverName = location.getServerName();
3175      if (serverName == null) {
3176        future
3177          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3178        return;
3179      }
3180      addListener(
3181        this.<GetRegionInfoResponse> newAdminCaller()
3182          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3183            GetRegionInfoResponse> adminCall(controller, stub,
3184              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3185                true),
3186              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3187          .serverName(serverName).call(),
3188        (resp2, err2) -> {
3189          if (err2 != null) {
3190            future.completeExceptionally(err2);
3191          } else {
3192            if (resp2.hasCompactionState()) {
3193              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3194            } else {
3195              future.complete(CompactionState.NONE);
3196            }
3197          }
3198        });
3199    });
3200    return future;
3201  }
3202
3203  @Override
3204  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3205    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3206      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3207    return this.<Optional<Long>> newMasterCaller()
3208      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3209        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3210          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3211          ProtobufUtil::toOptionalTimestamp))
3212      .call();
3213  }
3214
3215  @Override
3216  public CompletableFuture<Optional<Long>>
3217    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3218    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3219    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3220    addListener(getRegionInfo(regionName), (region, err) -> {
3221      if (err != null) {
3222        future.completeExceptionally(err);
3223        return;
3224      }
3225      MajorCompactionTimestampForRegionRequest.Builder builder =
3226        MajorCompactionTimestampForRegionRequest.newBuilder();
3227      builder.setRegion(
3228        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3229      addListener(this.<Optional<Long>> newMasterCaller()
3230        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3231          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3232            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3233            ProtobufUtil::toOptionalTimestamp))
3234        .call(), (timestamp, err2) -> {
3235          if (err2 != null) {
3236            future.completeExceptionally(err2);
3237          } else {
3238            future.complete(timestamp);
3239          }
3240        });
3241    });
3242    return future;
3243  }
3244
3245  @Override
3246  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3247    List<String> serverNamesList) {
3248    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3249    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3250      if (err != null) {
3251        future.completeExceptionally(err);
3252        return;
3253      }
3254      // Accessed by multiple threads.
3255      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3256      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3257      serverNames.stream().forEach(serverName -> {
3258        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3259          if (err2 != null) {
3260            future.completeExceptionally(unwrapCompletionException(err2));
3261          } else {
3262            serverStates.put(serverName, serverState);
3263          }
3264        }));
3265      });
3266      addListener(
3267        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3268        (ret, err3) -> {
3269          if (!future.isCompletedExceptionally()) {
3270            if (err3 != null) {
3271              future.completeExceptionally(err3);
3272            } else {
3273              future.complete(serverStates);
3274            }
3275          }
3276        });
3277    });
3278    return future;
3279  }
3280
3281  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3282    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3283    if (serverNamesList.isEmpty()) {
3284      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3285        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3286      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3287        if (err != null) {
3288          future.completeExceptionally(err);
3289        } else {
3290          future.complete(clusterMetrics.getServersName());
3291        }
3292      });
3293      return future;
3294    } else {
3295      List<ServerName> serverList = new ArrayList<>();
3296      for (String regionServerName : serverNamesList) {
3297        ServerName serverName = null;
3298        try {
3299          serverName = ServerName.valueOf(regionServerName);
3300        } catch (Exception e) {
3301          future.completeExceptionally(
3302            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3303        }
3304        if (serverName == null) {
3305          future.completeExceptionally(
3306            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3307        } else {
3308          serverList.add(serverName);
3309        }
3310      }
3311      future.complete(serverList);
3312    }
3313    return future;
3314  }
3315
3316  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3317    return this.<Boolean> newAdminCaller().serverName(serverName)
3318      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3319        Boolean> adminCall(controller, stub,
3320          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3321          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3322      .call();
3323  }
3324
3325  @Override
3326  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3327    return this.<Boolean> newMasterCaller()
3328      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3329        Boolean> call(controller, stub,
3330          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3331          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3332          (resp) -> resp.getPrevBalanceValue()))
3333      .call();
3334  }
3335
3336  @Override
3337  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3338    return this.<BalanceResponse> newMasterCaller()
3339      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3340        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3341          (s, c, req, done) -> s.balance(c, req, done),
3342          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3343      .call();
3344  }
3345
3346  @Override
3347  public CompletableFuture<Boolean> isBalancerEnabled() {
3348    return this.<Boolean> newMasterCaller()
3349      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3350        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3351          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3352      .call();
3353  }
3354
3355  @Override
3356  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3357    return this.<Boolean> newMasterCaller()
3358      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3359        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3360          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3361          (resp) -> resp.getPrevNormalizerValue()))
3362      .call();
3363  }
3364
3365  @Override
3366  public CompletableFuture<Boolean> isNormalizerEnabled() {
3367    return this.<Boolean> newMasterCaller()
3368      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3369        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3370          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3371      .call();
3372  }
3373
3374  @Override
3375  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3376    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3377  }
3378
3379  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3380    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3381      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3382  }
3383
3384  @Override
3385  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3386    return this.<Boolean> newMasterCaller()
3387      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3388        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3389          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3390          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3391          (resp) -> resp.getPrevValue()))
3392      .call();
3393  }
3394
3395  @Override
3396  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3397    return this.<Boolean> newMasterCaller()
3398      .action(
3399        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3400          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3401            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3402      .call();
3403  }
3404
3405  @Override
3406  public CompletableFuture<Boolean> runCleanerChore() {
3407    return this.<Boolean> newMasterCaller()
3408      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3409        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3410          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3411          (resp) -> resp.getCleanerChoreRan()))
3412      .call();
3413  }
3414
3415  @Override
3416  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3417    return this.<Boolean> newMasterCaller()
3418      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3419        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3420          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3421      .call();
3422  }
3423
3424  @Override
3425  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3426    return this.<Boolean> newMasterCaller()
3427      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3428        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3429          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3430          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3431      .call();
3432  }
3433
3434  @Override
3435  public CompletableFuture<Integer> runCatalogJanitor() {
3436    return this.<Integer> newMasterCaller()
3437      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3438        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3439          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3440      .call();
3441  }
3442
3443  @Override
3444  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3445    ServiceCaller<S, R> callable) {
3446    MasterCoprocessorRpcChannelImpl channel =
3447      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3448    S stub = stubMaker.apply(channel);
3449    CompletableFuture<R> future = new CompletableFuture<>();
3450    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3451    callable.call(stub, controller, resp -> {
3452      if (controller.failed()) {
3453        future.completeExceptionally(controller.getFailed());
3454      } else {
3455        future.complete(resp);
3456      }
3457    });
3458    return future;
3459  }
3460
3461  @Override
3462  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3463    ServiceCaller<S, R> callable, ServerName serverName) {
3464    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3465      this.<Message> newServerCaller().serverName(serverName));
3466    S stub = stubMaker.apply(channel);
3467    CompletableFuture<R> future = new CompletableFuture<>();
3468    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3469    callable.call(stub, controller, resp -> {
3470      if (controller.failed()) {
3471        future.completeExceptionally(controller.getFailed());
3472      } else {
3473        future.complete(resp);
3474      }
3475    });
3476    return future;
3477  }
3478
3479  @Override
3480  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3481    return this.<List<ServerName>> newMasterCaller()
3482      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3483        List<ServerName>> call(controller, stub,
3484          RequestConverter.buildClearDeadServersRequest(servers),
3485          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3486          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3487      .call();
3488  }
3489
3490  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3491    return this.connection.callerFactory.<T> serverRequest()
3492      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3493      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3494      .pause(pauseNs, TimeUnit.NANOSECONDS)
3495      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3496      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3497  }
3498
3499  @Override
3500  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3501    if (tableName == null) {
3502      return failedFuture(new IllegalArgumentException("Table name is null"));
3503    }
3504    CompletableFuture<Void> future = new CompletableFuture<>();
3505    addListener(tableExists(tableName), (exist, err) -> {
3506      if (err != null) {
3507        future.completeExceptionally(err);
3508        return;
3509      }
3510      if (!exist) {
3511        future.completeExceptionally(new TableNotFoundException(
3512          "Table '" + tableName.getNameAsString() + "' does not exists."));
3513        return;
3514      }
3515      addListener(getTableSplits(tableName), (splits, err1) -> {
3516        if (err1 != null) {
3517          future.completeExceptionally(err1);
3518        } else {
3519          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3520            if (err2 != null) {
3521              future.completeExceptionally(err2);
3522            } else {
3523              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3524                if (err3 != null) {
3525                  future.completeExceptionally(err3);
3526                } else {
3527                  future.complete(result3);
3528                }
3529              });
3530            }
3531          });
3532        }
3533      });
3534    });
3535    return future;
3536  }
3537
3538  @Override
3539  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3540    if (tableName == null) {
3541      return failedFuture(new IllegalArgumentException("Table name is null"));
3542    }
3543    CompletableFuture<Void> future = new CompletableFuture<>();
3544    addListener(tableExists(tableName), (exist, err) -> {
3545      if (err != null) {
3546        future.completeExceptionally(err);
3547        return;
3548      }
3549      if (!exist) {
3550        future.completeExceptionally(new TableNotFoundException(
3551          "Table '" + tableName.getNameAsString() + "' does not exists."));
3552        return;
3553      }
3554      addListener(setTableReplication(tableName, false), (result, err2) -> {
3555        if (err2 != null) {
3556          future.completeExceptionally(err2);
3557        } else {
3558          future.complete(result);
3559        }
3560      });
3561    });
3562    return future;
3563  }
3564
3565  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3566    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3567    addListener(
3568      getRegions(tableName).thenApply(regions -> regions.stream()
3569        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3570      (regions, err2) -> {
3571        if (err2 != null) {
3572          future.completeExceptionally(err2);
3573          return;
3574        }
3575        if (regions.size() == 1) {
3576          future.complete(null);
3577        } else {
3578          byte[][] splits = new byte[regions.size() - 1][];
3579          for (int i = 1; i < regions.size(); i++) {
3580            splits[i - 1] = regions.get(i).getStartKey();
3581          }
3582          future.complete(splits);
3583        }
3584      });
3585    return future;
3586  }
3587
3588  /**
3589   * Connect to peer and check the table descriptor on peer:
3590   * <ol>
3591   * <li>Create the same table on peer when not exist.</li>
3592   * <li>Throw an exception if the table already has replication enabled on any of the column
3593   * families.</li>
3594   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3595   * </ol>
3596   * @param tableName name of the table to sync to the peer
3597   * @param splits    table split keys
3598   */
3599  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3600    byte[][] splits) {
3601    CompletableFuture<Void> future = new CompletableFuture<>();
3602    addListener(listReplicationPeers(), (peers, err) -> {
3603      if (err != null) {
3604        future.completeExceptionally(err);
3605        return;
3606      }
3607      if (peers == null || peers.size() <= 0) {
3608        future.completeExceptionally(
3609          new IllegalArgumentException("Found no peer cluster for replication."));
3610        return;
3611      }
3612      List<CompletableFuture<Void>> futures = new ArrayList<>();
3613      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3614        .forEach(peer -> {
3615          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3616        });
3617      addListener(
3618        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3619        (result, err2) -> {
3620          if (err2 != null) {
3621            future.completeExceptionally(err2);
3622          } else {
3623            future.complete(result);
3624          }
3625        });
3626    });
3627    return future;
3628  }
3629
3630  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3631    ReplicationPeerDescription peer) {
3632    Configuration peerConf = null;
3633    try {
3634      peerConf =
3635        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3636    } catch (IOException e) {
3637      return failedFuture(e);
3638    }
3639    CompletableFuture<Void> future = new CompletableFuture<>();
3640    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3641      if (err != null) {
3642        future.completeExceptionally(err);
3643        return;
3644      }
3645      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3646        if (err1 != null) {
3647          future.completeExceptionally(err1);
3648          return;
3649        }
3650        AsyncAdmin peerAdmin = conn.getAdmin();
3651        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3652          if (err2 != null) {
3653            future.completeExceptionally(err2);
3654            return;
3655          }
3656          if (!exist) {
3657            CompletableFuture<Void> createTableFuture = null;
3658            if (splits == null) {
3659              createTableFuture = peerAdmin.createTable(tableDesc);
3660            } else {
3661              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3662            }
3663            addListener(createTableFuture, (result, err3) -> {
3664              if (err3 != null) {
3665                future.completeExceptionally(err3);
3666              } else {
3667                future.complete(result);
3668              }
3669            });
3670          } else {
3671            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3672              (result, err4) -> {
3673                if (err4 != null) {
3674                  future.completeExceptionally(err4);
3675                } else {
3676                  future.complete(result);
3677                }
3678              });
3679          }
3680        });
3681      });
3682    });
3683    return future;
3684  }
3685
3686  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3687    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3688    CompletableFuture<Void> future = new CompletableFuture<>();
3689    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3690      if (err != null) {
3691        future.completeExceptionally(err);
3692        return;
3693      }
3694      if (peerTableDesc == null) {
3695        future.completeExceptionally(
3696          new IllegalArgumentException("Failed to get table descriptor for table "
3697            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3698        return;
3699      }
3700      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3701        future.completeExceptionally(new IllegalArgumentException(
3702          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3703            + ", but the table descriptors are not same when compared with source cluster."
3704            + " Thus can not enable the table's replication switch."));
3705        return;
3706      }
3707      future.complete(null);
3708    });
3709    return future;
3710  }
3711
3712  /**
3713   * Set the table's replication switch if the table's replication switch is already not set.
3714   * @param tableName name of the table
3715   * @param enableRep is replication switch enable or disable
3716   */
3717  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3718    CompletableFuture<Void> future = new CompletableFuture<>();
3719    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3720      if (err != null) {
3721        future.completeExceptionally(err);
3722        return;
3723      }
3724      if (!tableDesc.matchReplicationScope(enableRep)) {
3725        int scope =
3726          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3727        TableDescriptor newTableDesc =
3728          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3729        addListener(modifyTable(newTableDesc), (result, err2) -> {
3730          if (err2 != null) {
3731            future.completeExceptionally(err2);
3732          } else {
3733            future.complete(result);
3734          }
3735        });
3736      } else {
3737        future.complete(null);
3738      }
3739    });
3740    return future;
3741  }
3742
3743  @Override
3744  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3745    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3746    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3747      if (err != null) {
3748        future.completeExceptionally(err);
3749        return;
3750      }
3751      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3752        locations.stream().filter(l -> l.getRegion() != null)
3753          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3754          .collect(Collectors.groupingBy(l -> l.getServerName(),
3755            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3756      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3757      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3758      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3759        futures
3760          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3761            if (err2 != null) {
3762              future.completeExceptionally(unwrapCompletionException(err2));
3763            } else {
3764              aggregator.append(stats);
3765            }
3766          }));
3767      }
3768      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3769        (ret, err3) -> {
3770          if (err3 != null) {
3771            future.completeExceptionally(unwrapCompletionException(err3));
3772          } else {
3773            future.complete(aggregator.sum());
3774          }
3775        });
3776    });
3777    return future;
3778  }
3779
3780  @Override
3781  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3782    boolean preserveSplits) {
3783    CompletableFuture<Void> future = new CompletableFuture<>();
3784    addListener(tableExists(tableName), (exist, err) -> {
3785      if (err != null) {
3786        future.completeExceptionally(err);
3787        return;
3788      }
3789      if (!exist) {
3790        future.completeExceptionally(new TableNotFoundException(tableName));
3791        return;
3792      }
3793      addListener(tableExists(newTableName), (exist1, err1) -> {
3794        if (err1 != null) {
3795          future.completeExceptionally(err1);
3796          return;
3797        }
3798        if (exist1) {
3799          future.completeExceptionally(new TableExistsException(newTableName));
3800          return;
3801        }
3802        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3803          if (err2 != null) {
3804            future.completeExceptionally(err2);
3805            return;
3806          }
3807          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3808          if (preserveSplits) {
3809            addListener(getTableSplits(tableName), (splits, err3) -> {
3810              if (err3 != null) {
3811                future.completeExceptionally(err3);
3812              } else {
3813                addListener(
3814                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3815                  (result, err4) -> {
3816                    if (err4 != null) {
3817                      future.completeExceptionally(err4);
3818                    } else {
3819                      future.complete(result);
3820                    }
3821                  });
3822              }
3823            });
3824          } else {
3825            addListener(createTable(newTableDesc), (result, err5) -> {
3826              if (err5 != null) {
3827                future.completeExceptionally(err5);
3828              } else {
3829                future.complete(result);
3830              }
3831            });
3832          }
3833        });
3834      });
3835    });
3836    return future;
3837  }
3838
3839  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3840    List<RegionInfo> hris) {
3841    return this.<CacheEvictionStats> newAdminCaller()
3842      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
3843        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
3844          RequestConverter.buildClearRegionBlockCacheRequest(hris),
3845          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3846          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3847      .serverName(serverName).call();
3848  }
3849
3850  @Override
3851  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3852    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3853      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
3854        Boolean> call(controller, stub,
3855          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3856          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3857          resp -> resp.getPreviousRpcThrottleEnabled()))
3858      .call();
3859    return future;
3860  }
3861
3862  @Override
3863  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3864    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3865      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
3866        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3867          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3868          resp -> resp.getRpcThrottleEnabled()))
3869      .call();
3870    return future;
3871  }
3872
3873  @Override
3874  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3875    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3876      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
3877        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
3878          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3879            .build(),
3880          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3881          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3882      .call();
3883    return future;
3884  }
3885
3886  @Override
3887  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3888    return this.<Map<TableName, Long>> newMasterCaller()
3889      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
3890        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
3891          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3892          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3893          resp -> resp.getSizesList().stream().collect(Collectors
3894            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3895      .call();
3896  }
3897
3898  @Override
3899  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
3900    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
3901    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3902      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
3903        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
3904          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3905          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3906          resp -> resp.getSnapshotsList().stream()
3907            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3908              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3909      .serverName(serverName).call();
3910  }
3911
3912  private CompletableFuture<SpaceQuotaSnapshot>
3913    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3914    return this.<SpaceQuotaSnapshot> newMasterCaller()
3915      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
3916        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
3917          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3918      .call();
3919  }
3920
3921  @Override
3922  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3923    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3924      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3925      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3926  }
3927
3928  @Override
3929  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3930    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3931    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3932      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3933      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3934  }
3935
3936  @Override
3937  public CompletableFuture<Void> grant(UserPermission userPermission,
3938    boolean mergeExistingPermissions) {
3939    return this.<Void> newMasterCaller()
3940      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
3941        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3942        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3943      .call();
3944  }
3945
3946  @Override
3947  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3948    return this.<Void> newMasterCaller()
3949      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3950        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3951        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3952      .call();
3953  }
3954
3955  @Override
3956  public CompletableFuture<List<UserPermission>>
3957    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3958    return this.<List<UserPermission>> newMasterCaller()
3959      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
3960        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
3961          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3962          (s, c, req, done) -> s.getUserPermissions(c, req, done),
3963          resp -> resp.getUserPermissionList().stream()
3964            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3965            .collect(Collectors.toList())))
3966      .call();
3967  }
3968
3969  @Override
3970  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3971    List<Permission> permissions) {
3972    return this.<List<Boolean>> newMasterCaller()
3973      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
3974        List<Boolean>> call(controller, stub,
3975          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3976          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3977          resp -> resp.getHasUserPermissionList()))
3978      .call();
3979  }
3980
3981  @Override
3982  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
3983    return this.<Boolean> newMasterCaller()
3984      .action((controller, stub) -> this.call(controller, stub,
3985        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3986        MasterService.Interface::switchSnapshotCleanup,
3987        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
3988      .call();
3989  }
3990
3991  @Override
3992  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3993    return this.<Boolean> newMasterCaller()
3994      .action((controller, stub) -> this.call(controller, stub,
3995        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3996        MasterService.Interface::isSnapshotCleanupEnabled,
3997        IsSnapshotCleanupEnabledResponse::getEnabled))
3998      .call();
3999  }
4000
4001  @Override
4002  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4003    return this.<Void> newMasterCaller()
4004      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4005        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4006        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4007      .call();
4008  }
4009
4010  @Override
4011  public CompletableFuture<Void> addRSGroup(String groupName) {
4012    return this.<Void> newMasterCaller()
4013      .action(
4014        ((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(controller,
4015          stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4016          (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)))
4017      .call();
4018  }
4019
4020  @Override
4021  public CompletableFuture<Void> removeRSGroup(String groupName) {
4022    return this.<Void> newMasterCaller()
4023      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4024        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4025        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4026      .call();
4027  }
4028
4029  @Override
4030  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4031    BalanceRequest request) {
4032    return this.<BalanceResponse> newMasterCaller()
4033      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4034        BalanceResponse> call(controller, stub,
4035          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4036          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4037      .call();
4038  }
4039
4040  @Override
4041  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4042    return this.<List<RSGroupInfo>> newMasterCaller()
4043      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4044        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4045          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4046            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4047      .call();
4048  }
4049
4050  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4051    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4052    final String logType) {
4053    if (CollectionUtils.isEmpty(serverNames)) {
4054      return CompletableFuture.completedFuture(Collections.emptyList());
4055    }
4056    return CompletableFuture.supplyAsync(() -> serverNames
4057      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4058        filterParams, limit, logType))
4059      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4060  }
4061
4062  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4063    Map<String, Object> filterParams, int limit, String logType) {
4064    return this.<List<LogEntry>> newAdminCaller()
4065      .action((controller, stub) -> this.adminCall(controller, stub,
4066        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4067        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4068      .serverName(serverName).call();
4069  }
4070
4071  @Override
4072  public CompletableFuture<List<Boolean>>
4073    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4074    if (CollectionUtils.isEmpty(serverNames)) {
4075      return CompletableFuture.completedFuture(Collections.emptyList());
4076    }
4077    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4078      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4079    return convertToFutureOfList(clearSlowLogResponseList);
4080  }
4081
4082  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4083    return this.<Boolean> newAdminCaller()
4084      .action(((controller, stub) -> this.adminCall(controller, stub,
4085        RequestConverter.buildClearSlowLogResponseRequest(),
4086        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)))
4087      .serverName(serverName).call();
4088  }
4089
4090  private static <T> CompletableFuture<List<T>>
4091    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4092    CompletableFuture<Void> allDoneFuture =
4093      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4094    return allDoneFuture
4095      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4096  }
4097
4098  @Override
4099  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4100    return this.<List<TableName>> newMasterCaller()
4101      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4102        List<TableName>> call(controller, stub,
4103          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4104          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4105            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4106      .call();
4107  }
4108
4109  @Override
4110  public CompletableFuture<Pair<List<String>, List<TableName>>>
4111    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4112    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4113      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4114        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4115        Pair<List<String>, List<TableName>>> call(controller, stub,
4116          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4117            .build(),
4118          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4119          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4120            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4121      .call();
4122  }
4123
4124  @Override
4125  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4126    return this.<RSGroupInfo> newMasterCaller()
4127      .action(
4128        ((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, GetRSGroupInfoOfServerResponse,
4129          RSGroupInfo> call(controller, stub, GetRSGroupInfoOfServerRequest.newBuilder()
4130            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4131              .setPort(hostPort.getPort()).build())
4132            .build(), (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4133            resp -> resp.hasRSGroupInfo()
4134              ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo())
4135              : null)))
4136      .call();
4137  }
4138
4139  @Override
4140  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4141    return this.<Void> newMasterCaller()
4142      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4143        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4144        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4145      .call();
4146  }
4147
4148  @Override
4149  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4150    CompletableFuture<Void> future = new CompletableFuture<>();
4151    for (TableName tableName : tables) {
4152      addListener(tableExists(tableName), (exist, err) -> {
4153        if (err != null) {
4154          future.completeExceptionally(err);
4155          return;
4156        }
4157        if (!exist) {
4158          future.completeExceptionally(new TableNotFoundException(tableName));
4159          return;
4160        }
4161      });
4162    }
4163    addListener(listTableDescriptors(new ArrayList<>(tables)), ((tableDescriptions, err) -> {
4164      if (err != null) {
4165        future.completeExceptionally(err);
4166        return;
4167      }
4168      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4169        future.complete(null);
4170        return;
4171      }
4172      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4173      for (TableDescriptor td : tableDescriptions) {
4174        newTableDescriptors
4175          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4176      }
4177      addListener(
4178        CompletableFuture.allOf(
4179          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4180        (v, e) -> {
4181          if (e != null) {
4182            future.completeExceptionally(e);
4183          } else {
4184            future.complete(v);
4185          }
4186        });
4187    }));
4188    return future;
4189  }
4190
4191  @Override
4192  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4193    return this.<RSGroupInfo> newMasterCaller()
4194      .action(((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4195        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4196          GetRSGroupInfoOfTableRequest.newBuilder()
4197            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4198          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4199          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4200      .call();
4201  }
4202
4203  @Override
4204  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4205    return this.<RSGroupInfo> newMasterCaller()
4206      .action(((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4207        RSGroupInfo> call(controller, stub,
4208          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4209          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4210          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4211      .call();
4212  }
4213
4214  @Override
4215  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4216    return this.<Void> newMasterCaller()
4217      .action(((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4218        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4219          .setNewRsgroupName(newName).build(),
4220        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null)))
4221      .call();
4222  }
4223
4224  @Override
4225  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4226    Map<String, String> configuration) {
4227    UpdateRSGroupConfigRequest.Builder request =
4228      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4229    if (configuration != null) {
4230      configuration.entrySet().forEach(e -> request.addConfiguration(
4231        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4232    }
4233    return this.<Void> newMasterCaller()
4234      .action(((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4235        Void> call(controller, stub, request.build(),
4236          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)))
4237      .call();
4238  }
4239
4240  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4241    return this.<List<LogEntry>> newMasterCaller()
4242      .action((controller, stub) -> this.call(controller, stub,
4243        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4244        ProtobufUtil::toBalancerDecisionResponse))
4245      .call();
4246  }
4247
4248  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4249    return this.<List<LogEntry>> newMasterCaller()
4250      .action((controller, stub) -> this.call(controller, stub,
4251        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4252        ProtobufUtil::toBalancerRejectionResponse))
4253      .call();
4254  }
4255
4256  @Override
4257  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4258    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4259    if (logType == null || serverType == null) {
4260      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4261    }
4262    switch (logType) {
4263      case "SLOW_LOG":
4264      case "LARGE_LOG":
4265        if (ServerType.MASTER.equals(serverType)) {
4266          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4267        }
4268        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4269      case "BALANCER_DECISION":
4270        if (ServerType.REGION_SERVER.equals(serverType)) {
4271          throw new IllegalArgumentException(
4272            "Balancer Decision logs are not maintained by HRegionServer");
4273        }
4274        return getBalancerDecisions(limit);
4275      case "BALANCER_REJECTION":
4276        if (ServerType.REGION_SERVER.equals(serverType)) {
4277          throw new IllegalArgumentException(
4278            "Balancer Rejection logs are not maintained by HRegionServer");
4279        }
4280        return getBalancerRejections(limit);
4281      default:
4282        return CompletableFuture.completedFuture(Collections.emptyList());
4283    }
4284  }
4285
4286  @Override
4287  public CompletableFuture<Void> flushMasterStore() {
4288    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4289    return this.<Void> newMasterCaller()
4290      .action(((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4291        Void> call(controller, stub, request.build(),
4292          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)))
4293      .call();
4294  }
4295}