001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.net.InetAddress;
026import java.util.HashSet;
027import java.util.Optional;
028import java.util.Set;
029import org.apache.hadoop.hbase.CellScanner;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.ipc.RpcCall;
033import org.apache.hadoop.hbase.ipc.RpcCallback;
034import org.apache.hadoop.hbase.ipc.RpcServer;
035import org.apache.hadoop.hbase.procedure2.Procedure;
036import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
037import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
038import org.apache.hadoop.hbase.security.User;
039import org.apache.hadoop.hbase.testclassification.MasterTests;
040import org.apache.hadoop.hbase.testclassification.SmallTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
051import org.apache.hbase.thirdparty.com.google.protobuf.Message;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
055
056@Category({ MasterTests.class, SmallTests.class })
057public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestRegionProcedureStore.class);
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class);
064
065  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
066    LOG.debug("expected: " + procIds);
067    LoadCounter loader = new LoadCounter();
068    ProcedureTestingUtility.storeRestart(store, true, loader);
069    assertEquals(procIds.size(), loader.getLoadedCount());
070    assertEquals(0, loader.getCorruptedCount());
071  }
072
073  @Test
074  public void testLoad() throws Exception {
075    Set<Long> procIds = new HashSet<>();
076
077    // Insert something in the log
078    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
079    procIds.add(proc1.getProcId());
080    store.insert(proc1, null);
081
082    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
083    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
084    proc3.setParent(proc2);
085    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
086    proc4.setParent(proc2);
087
088    procIds.add(proc2.getProcId());
089    procIds.add(proc3.getProcId());
090    procIds.add(proc4.getProcId());
091    store.insert(proc2, new Procedure[] { proc3, proc4 });
092
093    // Verify that everything is there
094    verifyProcIdsOnRestart(procIds);
095
096    // Update and delete something
097    proc1.finish();
098    store.update(proc1);
099    proc4.finish();
100    store.update(proc4);
101    store.delete(proc4.getProcId());
102    procIds.remove(proc4.getProcId());
103
104    // Verify that everything is there
105    verifyProcIdsOnRestart(procIds);
106  }
107
108  @Test
109  public void testCleanup() throws Exception {
110    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
111    store.insert(proc1, null);
112    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
113    store.insert(proc2, null);
114    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
115    store.insert(proc3, null);
116    LoadCounter loader = new LoadCounter();
117    store.load(loader);
118    assertEquals(proc3.getProcId(), loader.getMaxProcId());
119    assertEquals(3, loader.getRunnableCount());
120
121    store.delete(proc3.getProcId());
122    store.delete(proc2.getProcId());
123    loader = new LoadCounter();
124    store.load(loader);
125    assertEquals(proc3.getProcId(), loader.getMaxProcId());
126    assertEquals(1, loader.getRunnableCount());
127
128    // the row should still be there
129    assertTrue(store.region
130      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
131    assertTrue(store.region
132      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
133
134    // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
135    // id
136    store.cleanup();
137    assertTrue(store.region
138      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
139    assertFalse(store.region
140      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
141
142    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
143    store.insert(proc4, null);
144    store.cleanup();
145    // proc3 should also be deleted as now proc4 holds the max proc id
146    assertFalse(store.region
147      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
148  }
149
150  /**
151   * Test for HBASE-23895
152   */
153  @Test
154  public void testInsertWithRpcCall() throws Exception {
155    RpcServer.setCurrentCall(newRpcCallWithDeadline());
156    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
157    store.insert(proc1, null);
158    RpcServer.setCurrentCall(null);
159  }
160
161  private RpcCall newRpcCallWithDeadline() {
162    return new RpcCall() {
163      @Override
164      public long getDeadline() {
165        return EnvironmentEdgeManager.currentTime();
166      }
167
168      @Override
169      public BlockingService getService() {
170        return null;
171      }
172
173      @Override
174      public Descriptors.MethodDescriptor getMethod() {
175        return null;
176      }
177
178      @Override
179      public Message getParam() {
180        return null;
181      }
182
183      @Override
184      public CellScanner getCellScanner() {
185        return null;
186      }
187
188      @Override
189      public long getReceiveTime() {
190        return 0;
191      }
192
193      @Override
194      public long getStartTime() {
195        return 0;
196      }
197
198      @Override
199      public void setStartTime(long startTime) {
200
201      }
202
203      @Override
204      public int getTimeout() {
205        return 0;
206      }
207
208      @Override
209      public int getPriority() {
210        return 0;
211      }
212
213      @Override
214      public long getSize() {
215        return 0;
216      }
217
218      @Override
219      public RPCProtos.RequestHeader getHeader() {
220        return null;
221      }
222
223      @Override
224      public int getRemotePort() {
225        return 0;
226      }
227
228      @Override
229      public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
230        String error) {
231      }
232
233      @Override
234      public void sendResponseIfReady() throws IOException {
235      }
236
237      @Override
238      public void cleanup() {
239      }
240
241      @Override
242      public String toShortString() {
243        return null;
244      }
245
246      @Override
247      public long disconnectSince() {
248        return 0;
249      }
250
251      @Override
252      public boolean isClientCellBlockSupported() {
253        return false;
254      }
255
256      @Override
257      public Optional<User> getRequestUser() {
258        return Optional.empty();
259      }
260
261      @Override
262      public InetAddress getRemoteAddress() {
263        return null;
264      }
265
266      @Override
267      public HBaseProtos.VersionInfo getClientVersionInfo() {
268        return null;
269      }
270
271      @Override
272      public void setCallBack(RpcCallback callback) {
273      }
274
275      @Override
276      public boolean isRetryImmediatelySupported() {
277        return false;
278      }
279
280      @Override
281      public long getResponseCellSize() {
282        return 0;
283      }
284
285      @Override
286      public void incrementResponseCellSize(long cellSize) {
287      }
288
289      @Override
290      public long getResponseBlockSize() {
291        return 0;
292      }
293
294      @Override
295      public void incrementResponseBlockSize(long blockSize) {
296      }
297
298      @Override
299      public long getResponseExceptionSize() {
300        return 0;
301      }
302
303      @Override
304      public void incrementResponseExceptionSize(long exceptionSize) {
305      }
306    };
307  }
308}