Skip to content

Commit

Permalink
Merge branch 'master' into feat/error-thread-wfspec
Browse files Browse the repository at this point in the history
  • Loading branch information
KarlaCarvajal authored Feb 19, 2025
2 parents cffb9c1 + 1f4129d commit c6d3846
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
'use client'
import { Diagram } from '@/app/(authenticated)/[tenantId]/(diagram)/components/Diagram'
import { Navigation } from '@/app/(authenticated)/[tenantId]/components/Navigation'
import { useWfRun } from '@/app/hooks/useWfRun'
import { WfRunId, WfRunVariableAccessLevel } from 'littlehorse-client/proto'
import { useSearchParams } from 'next/navigation'
import { FC, useCallback } from 'react'
import { FC } from 'react'
import { Details } from './Details'
import { Variables } from './Variables'
import { useWfRun } from '@/app/hooks/useWfRun'
import { WfRunId, WfRunVariableAccessLevel } from 'littlehorse-client/proto'
import { isExternal } from 'util/types'

export const WfRun: FC<{ wfRunId: WfRunId, tenantId: string }> = ({ wfRunId, tenantId }) => {
export const WfRun: FC<{ ids: string[], tenantId: string }> = ({ ids, tenantId }) => {
const wfRunId = ids.reduce((wfRunId, id, i) => (i === 0 ? { id } : { id, parentWfRunId: wfRunId }), {} as WfRunId);

const searchParams = useSearchParams()
const threadRunNumber = Number(searchParams.get('threadRunNumber'))
const { wfRunData, isLoading, isError } = useWfRun({ wfRunId, tenantId })
const { wfRunData } = useWfRun({ wfRunId, tenantId })
const { wfRunData: parentWfRunData } = useWfRun({ wfRunId: wfRunData?.wfRun?.id?.parentWfRunId ?? { id: '', parentWfRunId: undefined }, tenantId })

if (!wfRunData) return null
Expand All @@ -36,7 +37,7 @@ export const WfRun: FC<{ wfRunId: WfRunId, tenantId: string }> = ({ wfRunId, ten
<Diagram spec={wfSpec} wfRun={wfRun} nodeRuns={nodeRuns} />

<Variables
variableDefs={wfSpec.threadSpecs[wfRun.threadRuns[threadRunNumber].threadSpecName].variableDefs}
variableDefs={variableDefs}
variables={variables.filter(v => v.id?.threadRunNumber == Number(searchParams.get('threadRunNumber')))}
inheritedVariables={inheritedVariables}
/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,12 @@ import { Metadata } from 'next'
import { notFound } from 'next/navigation'
import { ClientError, Status } from 'nice-grpc-common'
import { WfRun } from './components/WfRun'
import { getWfRun } from '../../../../../actions/getWfRun'
import { WfRunId } from 'littlehorse-client/proto'

type Props = { params: { ids: string[]; tenantId: string } }

export default async function Page({ params: { ids, tenantId } }: Props) {
let wfRunId: WfRunId;
if (ids[1]) {
wfRunId = {
id: ids[1],
parentWfRunId: {
id: ids[0],
}
}
} else {
wfRunId = {
id: ids[0],
}
}

try {
return <WfRun wfRunId={wfRunId} tenantId={tenantId} />
return <WfRun ids={ids} tenantId={tenantId} />
} catch (error) {
if (error instanceof ClientError && error.code === Status.NOT_FOUND) return notFound()
throw error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.littlehorse.common.model.getable.objectId.ExternalEventIdModel;
import io.littlehorse.common.util.LHUtil;
import io.littlehorse.sdk.common.proto.ExternalEventNodeRun;
import io.littlehorse.sdk.common.proto.LHErrorType;
import io.littlehorse.sdk.common.proto.LHStatus;
import io.littlehorse.sdk.common.proto.VariableType;
import io.littlehorse.server.streams.topology.core.ExecutionContext;
Expand Down Expand Up @@ -84,9 +85,14 @@ public ExternalEventNodeRun.Builder toProto() {
}

@Override
public boolean checkIfProcessingCompleted(ProcessorExecutionContext processorContext) {
public boolean checkIfProcessingCompleted(ProcessorExecutionContext processorContext) throws NodeFailureException {
if (externalEventId != null) return true;

if (timedOut) {
FailureModel failure = new FailureModel("ExternalEvent did not arrive in time", LHErrorType.TIMEOUT.name());
throw new NodeFailureException(failure);
}

NodeModel node = nodeRun.getNode();
ExternalEventNodeModel eNode = node.getExternalEventNode();

Expand Down Expand Up @@ -165,7 +171,6 @@ public void processExternalEventTimeout(ExternalEventTimeoutModel timeout) {
return;
}

// This is leaking the logic of the
timedOut = true;
}

Expand Down
24 changes: 24 additions & 0 deletions server/src/test/java/e2e/ExternalEventTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.littlehorse.sdk.common.proto.ExternalEvent;
import io.littlehorse.sdk.common.proto.ExternalEventDefId;
import io.littlehorse.sdk.common.proto.ExternalEventId;
import io.littlehorse.sdk.common.proto.Failure;
import io.littlehorse.sdk.common.proto.LHErrorType;
import io.littlehorse.sdk.common.proto.LHStatus;
import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub;
import io.littlehorse.sdk.common.proto.PutExternalEventRequest;
Expand All @@ -26,6 +28,9 @@ public class ExternalEventTest {
public static final String EVT_NAME = "basic-test-event";
public static final String IGNORED_EVT_NAME = "not-a-real-event-kenobi";

@LHWorkflow("external-event-timeout")
public Workflow timeoutEvent;

@LHWorkflow("basic-external-event")
public Workflow basicExternalEvent;

Expand All @@ -41,6 +46,25 @@ public Workflow getBasicExternalEventWorkflow() {
});
}

@LHWorkflow("external-event-timeout")
public Workflow getTimeoutWorkflow() {
return Workflow.newWorkflow("external-event-timeout", wf -> {
wf.waitForEvent(EVT_NAME).timeout(1);
});
}

@Test
void shouldTimeoutIfNoEvent() {
verifier.prepareRun(timeoutEvent)
.waitForStatus(LHStatus.ERROR)
.thenVerifyNodeRun(0, 1, nodeRun -> {
Failure failure = nodeRun.getFailures(0);
Assertions.assertThat(failure.getFailureName()).isEqualTo(LHErrorType.TIMEOUT.toString());
Assertions.assertThat(failure.getMessage().toLowerCase()).contains("arrive in time");
})
.start();
}

@Test
void shouldCompleteIfEventIsSentAfterWfRunStarts() {
WfRunId id = WfRunId.newBuilder().setId(LHUtil.generateGuid()).build();
Expand Down

0 comments on commit c6d3846

Please sign in to comment.