001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.net.InetAddress;
026import java.security.cert.X509Certificate;
027import java.util.HashSet;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Set;
031import org.apache.hadoop.hbase.CellScanner;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.ipc.RpcCall;
035import org.apache.hadoop.hbase.ipc.RpcCallback;
036import org.apache.hadoop.hbase.ipc.RpcServer;
037import org.apache.hadoop.hbase.procedure2.Procedure;
038import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
039import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.MasterTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
052import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
053import org.apache.hbase.thirdparty.com.google.protobuf.Message;
054
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
057
058@Category({ MasterTests.class, SmallTests.class })
059public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestRegionProcedureStore.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class);
066
067  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
068    LOG.debug("expected: " + procIds);
069    LoadCounter loader = new LoadCounter();
070    ProcedureTestingUtility.storeRestart(store, true, loader);
071    assertEquals(procIds.size(), loader.getLoadedCount());
072    assertEquals(0, loader.getCorruptedCount());
073  }
074
075  @Test
076  public void testLoad() throws Exception {
077    Set<Long> procIds = new HashSet<>();
078
079    // Insert something in the log
080    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
081    procIds.add(proc1.getProcId());
082    store.insert(proc1, null);
083
084    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
085    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
086    proc3.setParent(proc2);
087    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
088    proc4.setParent(proc2);
089
090    procIds.add(proc2.getProcId());
091    procIds.add(proc3.getProcId());
092    procIds.add(proc4.getProcId());
093    store.insert(proc2, new Procedure[] { proc3, proc4 });
094
095    // Verify that everything is there
096    verifyProcIdsOnRestart(procIds);
097
098    // Update and delete something
099    proc1.finish();
100    store.update(proc1);
101    proc4.finish();
102    store.update(proc4);
103    store.delete(proc4.getProcId());
104    procIds.remove(proc4.getProcId());
105
106    // Verify that everything is there
107    verifyProcIdsOnRestart(procIds);
108  }
109
110  @Test
111  public void testCleanup() throws Exception {
112    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
113    store.insert(proc1, null);
114    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
115    store.insert(proc2, null);
116    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
117    store.insert(proc3, null);
118    LoadCounter loader = new LoadCounter();
119    store.load(loader);
120    assertEquals(proc3.getProcId(), loader.getMaxProcId());
121    assertEquals(3, loader.getRunnableCount());
122
123    store.delete(proc3.getProcId());
124    store.delete(proc2.getProcId());
125    loader = new LoadCounter();
126    store.load(loader);
127    assertEquals(proc3.getProcId(), loader.getMaxProcId());
128    assertEquals(1, loader.getRunnableCount());
129
130    // the row should still be there
131    assertTrue(store.region
132      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
133    assertTrue(store.region
134      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
135
136    // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
137    // id
138    store.cleanup();
139    assertTrue(store.region
140      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
141    assertFalse(store.region
142      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
143
144    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
145    store.insert(proc4, null);
146    store.cleanup();
147    // proc3 should also be deleted as now proc4 holds the max proc id
148    assertFalse(store.region
149      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
150  }
151
152  /**
153   * Test for HBASE-23895
154   */
155  @Test
156  public void testInsertWithRpcCall() throws Exception {
157    RpcServer.setCurrentCall(newRpcCallWithDeadline());
158    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
159    store.insert(proc1, null);
160    RpcServer.setCurrentCall(null);
161  }
162
163  @SuppressWarnings("checkstyle:methodlength")
164  private RpcCall newRpcCallWithDeadline() {
165    return new RpcCall() {
166      @Override
167      public long getDeadline() {
168        return EnvironmentEdgeManager.currentTime();
169      }
170
171      @Override
172      public BlockingService getService() {
173        return null;
174      }
175
176      @Override
177      public Descriptors.MethodDescriptor getMethod() {
178        return null;
179      }
180
181      @Override
182      public Message getParam() {
183        return null;
184      }
185
186      @Override
187      public CellScanner getCellScanner() {
188        return null;
189      }
190
191      @Override
192      public long getReceiveTime() {
193        return 0;
194      }
195
196      @Override
197      public long getStartTime() {
198        return 0;
199      }
200
201      @Override
202      public void setStartTime(long startTime) {
203
204      }
205
206      @Override
207      public int getTimeout() {
208        return 0;
209      }
210
211      @Override
212      public int getPriority() {
213        return 0;
214      }
215
216      @Override
217      public long getSize() {
218        return 0;
219      }
220
221      @Override
222      public RPCProtos.RequestHeader getHeader() {
223        return null;
224      }
225
226      @Override
227      public Map<String, byte[]> getConnectionAttributes() {
228        return null;
229      }
230
231      @Override
232      public Map<String, byte[]> getRequestAttributes() {
233        return null;
234      }
235
236      @Override
237      public byte[] getRequestAttribute(String key) {
238        return null;
239      }
240
241      @Override
242      public int getRemotePort() {
243        return 0;
244      }
245
246      @Override
247      public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
248        String error) {
249      }
250
251      @Override
252      public void sendResponseIfReady() throws IOException {
253      }
254
255      @Override
256      public void cleanup() {
257      }
258
259      @Override
260      public String toShortString() {
261        return null;
262      }
263
264      @Override
265      public long disconnectSince() {
266        return 0;
267      }
268
269      @Override
270      public boolean isClientCellBlockSupported() {
271        return false;
272      }
273
274      @Override
275      public Optional<User> getRequestUser() {
276        return Optional.empty();
277      }
278
279      @Override
280      public Optional<X509Certificate[]> getClientCertificateChain() {
281        return Optional.empty();
282      }
283
284      @Override
285      public InetAddress getRemoteAddress() {
286        return null;
287      }
288
289      @Override
290      public HBaseProtos.VersionInfo getClientVersionInfo() {
291        return null;
292      }
293
294      @Override
295      public void setCallBack(RpcCallback callback) {
296      }
297
298      @Override
299      public boolean isRetryImmediatelySupported() {
300        return false;
301      }
302
303      @Override
304      public long getResponseCellSize() {
305        return 0;
306      }
307
308      @Override
309      public void incrementResponseCellSize(long cellSize) {
310      }
311
312      @Override
313      public long getBlockBytesScanned() {
314        return 0;
315      }
316
317      @Override
318      public void incrementBlockBytesScanned(long blockSize) {
319      }
320
321      @Override
322      public long getResponseExceptionSize() {
323        return 0;
324      }
325
326      @Override
327      public void incrementResponseExceptionSize(long exceptionSize) {
328      }
329
330      @Override
331      public void updateFsReadTime(long latencyMillis) {
332
333      }
334
335      @Override
336      public long getFsReadTime() {
337        return 0;
338      }
339    };
340  }
341}