Skip to content

Commit

Permalink
Merge branch 'master' into renovate/hivemq-edge/master-npm-patch
Browse files Browse the repository at this point in the history
  • Loading branch information
vanch3d authored Aug 6, 2024
2 parents 31b40fd + 71ade83 commit 86e1ae9
Show file tree
Hide file tree
Showing 13 changed files with 641 additions and 437 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/etherip-package.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Publish EtherIP

on:
workflow_dispatch:

concurrency:
group: ${{ github.ref }}-check
cancel-in-progress: true
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
jobs:
publish:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- name: Checkout
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4
with:
repository: 'ornl-epics/etherip'
- name: Setup Java
uses: actions/setup-java@99b8673ff64fbf99d8d325f52d9a5bdedb8483e9 # v4
with:
distribution: 'adopt'
java-version: '11'
- name: Build and Publish
run: mvn package deploy:deploy-file -Durl=https://maven.pkg.github.com/hivemq/etherip -DrepositoryId=github -Dfile=target/etherip-1.0.0.jar -DpomFile=pom.xml -Dtoken=GH_TOKEN

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.edge.modules.adapters.impl.polling;

import com.hivemq.adapter.sdk.api.events.EventService;
import com.hivemq.adapter.sdk.api.events.model.Event;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingSampler;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.util.ExceptionUtils;
import com.hivemq.util.NanoTimeProvider;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class PollingTask implements Runnable {

private static final @NotNull Logger log = LoggerFactory.getLogger(PollingTask.class);

private final @NotNull ProtocolAdapterPollingSampler sampler;
private final @NotNull ScheduledExecutorService scheduledExecutorService;
private final @NotNull EventService eventService;
private final @NotNull NanoTimeProvider nanoTimeProvider;
private final @NotNull AtomicInteger watchdogErrorCount = new AtomicInteger();
private final @NotNull AtomicInteger applicationErrorCount = new AtomicInteger();

private volatile long nanosOfLastPolling;
private final @NotNull AtomicBoolean continueScheduling = new AtomicBoolean(true);


public PollingTask(
final @NotNull ProtocolAdapterPollingSampler sampler,
final @NotNull ScheduledExecutorService scheduledExecutorService,
final @NotNull EventService eventService,
final @NotNull NanoTimeProvider nanoTimeProvider) {
this.sampler = sampler;
this.scheduledExecutorService = scheduledExecutorService;
this.eventService = eventService;
this.nanoTimeProvider = nanoTimeProvider;
}

@Override
public void run() {
try {
nanosOfLastPolling = nanoTimeProvider.nanoTime();
if (!continueScheduling.get()) {
return;
}
final CompletableFuture<?> localExecutionFuture = sampler.execute()
.orTimeout(InternalConfigurations.ADAPTER_RUNTIME_JOB_EXECUTION_TIMEOUT_MILLIS.get(),
TimeUnit.MILLISECONDS);
localExecutionFuture.whenComplete((aVoid, throwable) -> {
if (throwable == null) {
resetErrorStats();
reschedule(0);
} else {
if (ExceptionUtils.isInterruptedException(throwable)) {
handleInterruptionException(throwable);
} else {
handleExceptionDuringPolling(throwable);
}
}
});
} catch (Throwable t) {
// the sampler shouldn't throw a exception, but better safe than sorry as we might to miss rescheduling the task otherwise.
handleExceptionDuringPolling(t);
}
}

public void stopScheduling() {
continueScheduling.set(false);
}

private void handleInterruptionException(final @NotNull Throwable throwable) {
//-- Job was killed by the framework as it took too long
//-- Do not call back to the job here (notify) since it will
//-- Not respond and we dont want to block other polls
int errorCountTotal = watchdogErrorCount.incrementAndGet();
boolean stopBecauseOfTooManyErrors =
errorCountTotal > InternalConfigurations.ADAPTER_RUNTIME_WATCHDOG_TIMEOUT_ERRORS_BEFORE_INTERRUPT.get();
final long milliSecondsSinceLastPoll = TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
if (stopBecauseOfTooManyErrors) {
log.warn(
"Detected bad system process {} in sampler {} - terminating process to maintain health ({}ms runtime)",
errorCountTotal,
sampler.getAdapterId(),
milliSecondsSinceLastPoll);
} else {
if (log.isDebugEnabled()) {
log.debug(
"Detected bad system process {} in sampler {} - interrupted process to maintain health ({}ms runtime)",
errorCountTotal,
sampler.getAdapterId(),
milliSecondsSinceLastPoll);
}
}
notifyOnError(sampler, throwable, !stopBecauseOfTooManyErrors);
if (!stopBecauseOfTooManyErrors) {
reschedule(errorCountTotal);
}
}


private void handleExceptionDuringPolling(final @NotNull Throwable throwable) {
int errorCountTotal = applicationErrorCount.incrementAndGet();
final int maxErrorsBeforeRemoval = sampler.getMaxErrorsBeforeRemoval();
// case 1: Unlimited retry (maxErrorsBeforeRemoval < 0) or less errors than the limit
if (maxErrorsBeforeRemoval < 0 || errorCountTotal <= maxErrorsBeforeRemoval) {
if (log.isDebugEnabled()) {
log.debug("Application Error {} in sampler {} -> {}",
errorCountTotal,
sampler.getAdapterId(),
throwable.getMessage(),
throwable);
}
reschedule(errorCountTotal);
notifyOnError(sampler, throwable, true);
} else {
log.info(
"Detected '{}' recent errors when sampling. This exceeds the configured limit of '{}'. Sampling for adapter with id '{}' gets stopped.",
errorCountTotal,
maxErrorsBeforeRemoval,
sampler.getAdapterId());
notifyOnError(sampler, throwable, false);
// no rescheduling
}

}

private void notifyOnError(
final @NotNull ProtocolAdapterPollingSampler sampler, final @NotNull Throwable t, boolean continuing) {
try {
sampler.error(t, continuing);
} catch (Throwable samplerError) {
if (log.isInfoEnabled()) {
log.info("Sampler Encountered Error In Notification", samplerError);
}
}
}

private void reschedule(int errorCountTotal) {
long pollDuration = TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
final long delayInMillis = sampler.getPeriod() - pollDuration;
// a negative delay means that the last polling attempt took longer to be processed than the specified delay between polls
if (delayInMillis < 0) {
log.warn(
"Polling for protocol adapter '{}' can not keep up with the specified '{}' interval, because the polling takes too long.",
sampler.getAdapterId(),
sampler.getPeriod());
eventService.createAdapterEvent(sampler.getAdapterId(), sampler.getProtocolId())
.withMessage(String.format(
"Polling for protocol adapter '%s' can not keep up with the specified '%d' ms interval, because the polling takes too long.",
sampler.getAdapterId(),
sampler.getPeriod()))
.withSeverity(Event.SEVERITY.WARN)
.fire();
}

long nonNegativeDelay = Math.max(0, delayInMillis);

if (errorCountTotal == 0) {
schedule(nonNegativeDelay);
} else {
long backoff = getBackoff(errorCountTotal,
InternalConfigurations.ADAPTER_RUNTIME_MAX_APPLICATION_ERROR_BACKOFF.get());
long effectiveDelay = Math.max(nonNegativeDelay, backoff);
schedule(effectiveDelay);
}
}

@VisibleForTesting
void schedule(long nonNegativeDelay) {
if (continueScheduling.get()) {
try {
scheduledExecutorService.schedule(this, nonNegativeDelay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException rejectedExecutionException) {
// ignore. This is fine during shutdown.
}
}
}

private void resetErrorStats() {
applicationErrorCount.set(0);
watchdogErrorCount.set(0);
}

private static long getBackoff(int errorCount, long max) {
//-- This will backoff up to a max of about a day (unless the max provided is less)
long f = (long) (Math.pow(2, Math.min(errorCount, 20)) * 100);
f += ThreadLocalRandom.current().nextInt(0, errorCount * 100);
f = Math.min(f, max);
return f;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.edge.modules.adapters.impl.polling;

import com.hivemq.adapter.sdk.api.ProtocolAdapter;
import com.hivemq.adapter.sdk.api.events.EventService;
import com.hivemq.common.shutdown.HiveMQShutdownHook;
import com.hivemq.common.shutdown.ShutdownHooks;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingSampler;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingService;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.util.NanoTimeProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* @author Daniel Krüger
*/
@Singleton
public class ProtocolAdapterPollingServiceImpl implements ProtocolAdapterPollingService {

private static final @NotNull Logger log = LoggerFactory.getLogger(ProtocolAdapterPollingServiceImpl.class);

private final @NotNull ScheduledExecutorService scheduledExecutorService;
private final @NotNull EventService eventService;
private final @NotNull NanoTimeProvider nanoTimeProvider;
private final @NotNull Map<ProtocolAdapterPollingSampler, PollingTask> samplerToTask = new ConcurrentHashMap<>();

@Inject
public ProtocolAdapterPollingServiceImpl(
final @NotNull ScheduledExecutorService scheduledExecutorService,
final @NotNull ShutdownHooks shutdownHooks,
final @NotNull EventService eventService,
final @NotNull NanoTimeProvider nanoTimeProvider) {
this.scheduledExecutorService = scheduledExecutorService;
this.eventService = eventService;
this.nanoTimeProvider = nanoTimeProvider;
shutdownHooks.add(new Shutdown());
}

@Override
public void schedulePolling(
@NotNull final ProtocolAdapter adapter, @NotNull final ProtocolAdapterPollingSampler sampler) {
final PollingTask pollingTask = new PollingTask(sampler, scheduledExecutorService, eventService, nanoTimeProvider);
scheduledExecutorService.schedule(pollingTask, sampler.getInitialDelay(), sampler.getUnit());
samplerToTask.put(sampler, pollingTask);
}

@Override
public void stopPollingForAdapterInstance(@NotNull final ProtocolAdapter adapter) {
samplerToTask.keySet()
.stream()
.filter(p -> p.getAdapterId().equals(adapter.getId()))
.forEach(this::stopPolling);
}

private void stopPolling(final @NotNull ProtocolAdapterPollingSampler sampler) {
final PollingTask taskToStop = samplerToTask.remove(sampler);
taskToStop.stopScheduling();
}

public void stopAllPolling() {
samplerToTask.keySet().forEach(this::stopPolling);
}


private class Shutdown implements HiveMQShutdownHook {
@Override
public @NotNull String name() {
return "Protocol Adapter Polling Service ShutDown";
}

@Override
public void run() {
stopAllPolling();
if (!scheduledExecutorService.isShutdown()) {
try {
scheduledExecutorService.shutdown();
if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {
scheduledExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("Error Encountered Attempting to Shutdown Adapter Polling Service", e);
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;

import static com.hivemq.adapter.sdk.api.state.ProtocolAdapterState.ConnectionStatus.STATELESS;
Expand Down Expand Up @@ -82,7 +81,7 @@ public void poll(
final int minDelay = adapterConfig.getMinDelay();
final int maxDelay = adapterConfig.getMaxDelay();

CompletableFuture.runAsync(() -> {
new Thread(() -> {
if (minDelay > maxDelay) {
pollingOutput.fail(String.format(
"The configured min '%d' delay was bigger than the max delay '%d'. Simulator Adapter will not publish a value.",
Expand Down Expand Up @@ -112,7 +111,7 @@ public void poll(
.nextDouble(Math.min(adapterConfig.getMinValue(), adapterConfig.getMaxValue()),
Math.max(adapterConfig.getMinValue() + 1, adapterConfig.getMaxValue())));
pollingOutput.finish();
});
}).start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public interface ProtocolAdapterPollingSampler {
@NotNull UUID getId();
@NotNull Date getCreated();
@NotNull String getAdapterId();
@NotNull String getProtocolId();

default @NotNull String getReferenceId(){
return String.format("%s:%s", getAdapterId(), getId());
Expand Down
Loading

0 comments on commit 86e1ae9

Please sign in to comment.