Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class ReconTaskControllerImpl implements ReconTaskController {
private final ReconTaskStatusUpdaterManager taskStatusUpdaterManager;
private final OMUpdateEventBuffer eventBuffer;
private ExecutorService eventProcessingExecutor;
private volatile boolean running = false;
private final AtomicBoolean tasksFailed = new AtomicBoolean(false);
private volatile ReconOMMetadataManager currentOMMetadataManager;
private final OzoneConfiguration configuration;
Expand Down Expand Up @@ -359,6 +360,7 @@ public synchronized void start() {
.build());

// Start async event processing thread
running = true;
eventProcessingExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("ReconEventProcessor-%d")
.build());
Expand All @@ -369,6 +371,9 @@ public synchronized void start() {
@Override
public synchronized void stop() {
LOG.info("Stopping Recon Task Controller.");
// Signal the event processing loop to exit on its next poll cycle so the
// graceful shutdown below can complete without waiting out the timeout.
running = false;
shutdownExecutorGracefully(this.executorService, "main task executor");
shutdownExecutorGracefully(this.eventProcessingExecutor, "event processing executor");
}
Expand Down Expand Up @@ -481,7 +486,7 @@ private void processTasks(
private void processBufferedEventsAsync() {
LOG.info("Started async buffered event processing thread");

while (!Thread.currentThread().isInterrupted()) {
while (running && !Thread.currentThread().isInterrupted()) {
try {
ReconEvent event = eventBuffer.poll(1000); // 1 second timeout
if (event != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
import org.apache.hadoop.util.Time;
import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao;
import org.apache.ozone.recon.schema.generated.tables.pojos.ReconTaskStatus;
import org.apache.ozone.test.GenericTestUtils;
Expand Down Expand Up @@ -95,6 +96,19 @@ public void setUp() throws IOException {
reconTaskController.start();
}

@Test
public void testStopCompletesPromptly() {
// stop() must not block on the graceful shutdown timeout. The event
// processing loop only exits on interrupt, so a plain shutdown() can never
// drain it and awaitTermination would otherwise burn the full 30s timeout.
long start = Time.monotonicNow();
reconTaskController.stop();
long elapsed = Time.monotonicNow() - start;
assertThat(elapsed)
.as("stop() should return promptly, not wait out the shutdown timeout")
.isLessThan(5000L);
}

@Test
public void testRegisterTask() {
String taskName = "Dummy_" + System.currentTimeMillis();
Expand Down Expand Up @@ -596,9 +610,11 @@ public void testProcessReInitializationEventWithTaskFailuresAndRetry() throws Ex
.thenReturn(false) // First call fails
.thenReturn(true); // Second call succeeds

// Stop async processing to control event processing manually
controllerSpy.stop();

// Stop async processing on the real controller so we can drive event
// processing manually. Stopping controllerSpy would only flip the flag on
// the Mockito copy, not the live event-processing thread.
controllerImpl.stop();

// Create and manually process a reinitialization event
ReconTaskReInitializationEvent reinitEvent = new ReconTaskReInitializationEvent(
ReconTaskReInitializationEvent.ReInitializationReason.TASK_FAILURES,
Expand Down Expand Up @@ -732,9 +748,11 @@ public void testProcessReInitializationEventWithCheckpointedManager() throws Exc
when(controllerSpy.reInitializeTasks(any(ReconOMMetadataManager.class), any()))
.thenReturn(true); // Succeed

// Stop async processing to control event processing manually
controllerSpy.stop();

// Stop async processing on the real controller so we can drive event
// processing manually. Stopping controllerSpy would only flip the flag on
// the Mockito copy, not the live event-processing thread.
controllerImpl.stop();

// Create reinitialization event with checkpointed manager
ReconTaskReInitializationEvent reinitEvent = new ReconTaskReInitializationEvent(
ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW,
Expand Down
Loading