001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.net.InetAddress;
026import java.security.cert.X509Certificate;
027import java.util.HashSet;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Set;
031import org.apache.hadoop.hbase.ExtendedCellScanner;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.ipc.RpcCall;
034import org.apache.hadoop.hbase.ipc.RpcCallback;
035import org.apache.hadoop.hbase.ipc.RpcServer;
036import org.apache.hadoop.hbase.procedure2.Procedure;
037import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
038import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.hadoop.hbase.testclassification.MasterTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.junit.jupiter.api.Tag;
045import org.junit.jupiter.api.Test;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
051import org.apache.hbase.thirdparty.com.google.protobuf.Message;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
055
056@Tag(SmallTests.TAG)
057@Tag(MasterTests.TAG)
058public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class);
061
062  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
063    LOG.debug("expected: " + procIds);
064    LoadCounter loader = new LoadCounter();
065    ProcedureTestingUtility.storeRestart(store, true, loader);
066    assertEquals(procIds.size(), loader.getLoadedCount());
067    assertEquals(0, loader.getCorruptedCount());
068  }
069
070  @Test
071  public void testLoad() throws Exception {
072    Set<Long> procIds = new HashSet<>();
073
074    // Insert something in the log
075    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
076    procIds.add(proc1.getProcId());
077    store.insert(proc1, null);
078
079    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
080    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
081    proc3.setParent(proc2);
082    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
083    proc4.setParent(proc2);
084
085    procIds.add(proc2.getProcId());
086    procIds.add(proc3.getProcId());
087    procIds.add(proc4.getProcId());
088    store.insert(proc2, new Procedure[] { proc3, proc4 });
089
090    // Verify that everything is there
091    verifyProcIdsOnRestart(procIds);
092
093    // Update and delete something
094    proc1.finish();
095    store.update(proc1);
096    proc4.finish();
097    store.update(proc4);
098    store.delete(proc4.getProcId());
099    procIds.remove(proc4.getProcId());
100
101    // Verify that everything is there
102    verifyProcIdsOnRestart(procIds);
103  }
104
105  @Test
106  public void testCleanup() throws Exception {
107    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
108    store.insert(proc1, null);
109    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
110    store.insert(proc2, null);
111    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
112    store.insert(proc3, null);
113    LoadCounter loader = new LoadCounter();
114    store.load(loader);
115    assertEquals(proc3.getProcId(), loader.getMaxProcId());
116    assertEquals(3, loader.getRunnableCount());
117
118    store.delete(proc3.getProcId());
119    store.delete(proc2.getProcId());
120    loader = new LoadCounter();
121    store.load(loader);
122    assertEquals(proc3.getProcId(), loader.getMaxProcId());
123    assertEquals(1, loader.getRunnableCount());
124
125    // the row should still be there
126    assertTrue(store.region
127      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
128    assertTrue(store.region
129      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
130
131    // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
132    // id
133    store.cleanup();
134    assertTrue(store.region
135      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
136    assertFalse(store.region
137      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
138
139    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
140    store.insert(proc4, null);
141    store.cleanup();
142    // proc3 should also be deleted as now proc4 holds the max proc id
143    assertFalse(store.region
144      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
145  }
146
147  /**
148   * Test for HBASE-23895
149   */
150  @Test
151  public void testInsertWithRpcCall() throws Exception {
152    RpcServer.setCurrentCall(newRpcCallWithDeadline());
153    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
154    store.insert(proc1, null);
155    RpcServer.setCurrentCall(null);
156  }
157
158  @SuppressWarnings("checkstyle:methodlength")
159  private RpcCall newRpcCallWithDeadline() {
160    return new RpcCall() {
161      @Override
162      public long getDeadline() {
163        return EnvironmentEdgeManager.currentTime();
164      }
165
166      @Override
167      public BlockingService getService() {
168        return null;
169      }
170
171      @Override
172      public Descriptors.MethodDescriptor getMethod() {
173        return null;
174      }
175
176      @Override
177      public Message getParam() {
178        return null;
179      }
180
181      @Override
182      public ExtendedCellScanner getCellScanner() {
183        return null;
184      }
185
186      @Override
187      public long getReceiveTime() {
188        return 0;
189      }
190
191      @Override
192      public long getStartTime() {
193        return 0;
194      }
195
196      @Override
197      public void setStartTime(long startTime) {
198
199      }
200
201      @Override
202      public int getTimeout() {
203        return 0;
204      }
205
206      @Override
207      public int getPriority() {
208        return 0;
209      }
210
211      @Override
212      public long getSize() {
213        return 0;
214      }
215
216      @Override
217      public RPCProtos.RequestHeader getHeader() {
218        return null;
219      }
220
221      @Override
222      public Map<String, byte[]> getConnectionAttributes() {
223        return null;
224      }
225
226      @Override
227      public Map<String, byte[]> getRequestAttributes() {
228        return null;
229      }
230
231      @Override
232      public byte[] getRequestAttribute(String key) {
233        return null;
234      }
235
236      @Override
237      public int getRemotePort() {
238        return 0;
239      }
240
241      @Override
242      public void setResponse(Message param, ExtendedCellScanner cells, Throwable errorThrowable,
243        String error) {
244      }
245
246      @Override
247      public void sendResponseIfReady() throws IOException {
248      }
249
250      @Override
251      public void cleanup() {
252      }
253
254      @Override
255      public String toShortString() {
256        return null;
257      }
258
259      @Override
260      public long disconnectSince() {
261        return 0;
262      }
263
264      @Override
265      public boolean isClientCellBlockSupported() {
266        return false;
267      }
268
269      @Override
270      public Optional<User> getRequestUser() {
271        return Optional.empty();
272      }
273
274      @Override
275      public Optional<X509Certificate[]> getClientCertificateChain() {
276        return Optional.empty();
277      }
278
279      @Override
280      public InetAddress getRemoteAddress() {
281        return null;
282      }
283
284      @Override
285      public HBaseProtos.VersionInfo getClientVersionInfo() {
286        return null;
287      }
288
289      @Override
290      public void setCallBack(RpcCallback callback) {
291      }
292
293      @Override
294      public boolean isRetryImmediatelySupported() {
295        return false;
296      }
297
298      @Override
299      public long getResponseCellSize() {
300        return 0;
301      }
302
303      @Override
304      public void incrementResponseCellSize(long cellSize) {
305      }
306
307      @Override
308      public long getBlockBytesScanned() {
309        return 0;
310      }
311
312      @Override
313      public void incrementBlockBytesScanned(long blockSize) {
314      }
315
316      @Override
317      public long getResponseExceptionSize() {
318        return 0;
319      }
320
321      @Override
322      public void incrementResponseExceptionSize(long exceptionSize) {
323      }
324    };
325  }
326}