Skip to content

Commit

Permalink
Added eventing tracing fast-integration tests (#12355)
Browse files Browse the repository at this point in the history
* added eventing tracing test for commerce-mock flow and in-cluster flow

* changed portName to fix tracing for publisher proxy

* addressed review comments 1
  • Loading branch information
mfaizanse authored Oct 20, 2021
1 parent ab7ad4a commit 426d89c
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 11 deletions.
16 changes: 16 additions & 0 deletions tests/fast-integration/eventing-test/eventing-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ const {
ensureCommerceMockLocalTestFixture,
checkFunctionResponse,
sendEventAndCheckResponse,
sendLegacyEventAndCheckTracing,
cleanMockTestFixture,
checkInClusterEventDelivery,
checkInClusterEventTracing,
waitForSubscriptionsTillReady,
setEventMeshSourceNamespace,
} = require("../test/fixtures/commerce-mock");
Expand Down Expand Up @@ -46,6 +48,16 @@ describe("Eventing tests", function () {
});
}

// eventingTracingTestSuite - Runs Eventing tracing tests
function eventingTracingTestSuite () {
it("order.created.v1 event from CommerceMock should have correct tracing spans", async function () {
await sendLegacyEventAndCheckTracing();
});
it("In-cluster event should have correct tracing spans", async function () {
await checkInClusterEventTracing(testNamespace);
});
}

before(async function() {
// runs once before the first test in this block

Expand Down Expand Up @@ -91,6 +103,8 @@ describe("Eventing tests", function () {
context('with Nats backend', function() {
// Running Eventing end-to-end tests
eventingE2ETestSuite();
// Running Eventing tracing tests
eventingTracingTestSuite();
});

context('with BEB backend', function() {
Expand All @@ -116,5 +130,7 @@ describe("Eventing tests", function () {

// Running Eventing end-to-end tests
eventingE2ETestSuite();
// Running Eventing tracing tests
eventingTracingTestSuite();
});
});
41 changes: 41 additions & 0 deletions tests/fast-integration/monitoring/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ const {
kubectlPortForward,
retryPromise,
convertAxiosError,
listPods,
debug,
} = require("../utils");

const SECOND = 1000;
let prometheusPort = 9090;
let jaegerPort = 16686;

function prometheusPortForward() {
return kubectlPortForward("kyma-system", "prometheus-monitoring-prometheus-0", prometheusPort);
Expand Down Expand Up @@ -90,11 +94,48 @@ async function queryGrafana(url, redirectURL, ignoreSSL, httpErrorCode) {
}
}

async function jaegerPortForward() {
const res = await getJaegerPods();
if (res.body.items.length === 0) {
throw new Error("cannot find any jaeger pods");
}

return kubectlPortForward("kyma-system", res.body.items[0].metadata.name, jaegerPort);
}

async function getJaegerPods() {
let labelSelector = `app=jaeger,app.kubernetes.io/component=all-in-one,app.kubernetes.io/instance=tracing-jaeger,app.kubernetes.io/managed-by=jaeger-operator,app.kubernetes.io/name=tracing-jaeger,app.kubernetes.io/part-of=jaeger`;
return listPods(labelSelector, "kyma-system");
}

async function getJaegerTrace(traceId) {
let path = `/api/traces/${traceId}`;
let url = `http://localhost:${jaegerPort}${path}`;

try {
debug(`fetching trace: ${traceId} from jaeger`)
let responseBody = await retryPromise(
() => {
debug(`waiting for trace (id: ${traceId}) from jaeger...`)
return axios.get(url, {timeout: 30 * SECOND})
},
30,
1000
);

return responseBody.data;
} catch (err) {
throw convertAxiosError(err, "cannot get jaeger trace");
}
}

module.exports = {
prometheusPortForward,
getPrometheusActiveTargets,
getPrometheusAlerts,
getPrometheusRuleGroups,
queryPrometheus,
queryGrafana,
jaegerPortForward,
getJaegerTrace,
};
105 changes: 100 additions & 5 deletions tests/fast-integration/test/fixtures/commerce-mock/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ const {
k8sDelete,
getSecretData,
namespaceObj,
serviceInstanceObj
serviceInstanceObj,
getTraceDAG,
} = require("../../../utils");

const {
registerOrReturnApplication,
} = require("../../../compass");

const {
jaegerPortForward,
getJaegerTrace,
} = require("../../../monitoring/client")

const {
OAuthToken,
OAuthCredentials
Expand Down Expand Up @@ -141,7 +147,7 @@ async function sendEventAndCheckResponse() {
const mockHost = vs.spec.hosts[0];
const host = mockHost.split(".").slice(1).join(".");

await retryPromise(
return await retryPromise(
async () => {
await axios
.post(
Expand Down Expand Up @@ -189,6 +195,91 @@ async function sendEventAndCheckResponse() {
);
}

async function sendLegacyEventAndCheckTracing() {
// Send an event and get it back from the lastorder function
const res = await sendEventAndCheckResponse();
expect(res.data).to.have.nested.property("event.headers.x-b3-traceid");
expect(res.data).to.have.nested.property("podName");

// Extract traceId from response
const traceId = res.data.event.headers["x-b3-traceid"];

// Define expected trace data
const correctTraceSpansLength = 6;
const correctTraceProcessSequence = [
'istio-ingressgateway.istio-system',
'central-application-connectivity-validator.kyma-system',
'central-application-connectivity-validator.kyma-system',
'eventing-publisher-proxy.kyma-system',
'eventing-controller.kyma-system',
`lastorder-${res.data.podName.split('-')[1]}.test`,
];

// wait sometime for jaeger to complete tracing data
await sleep(10 * 1000)
await checkTrace(traceId, correctTraceSpansLength, correctTraceProcessSequence)
}

async function checkInClusterEventTracing(targetNamespace) {
const res = await checkInClusterEventDeliveryHelper(targetNamespace, 'structured');
expect(res.data).to.have.nested.property("event.headers.x-b3-traceid");
expect(res.data).to.have.nested.property("podName");

// Extract traceId from response
const traceId = res.data.event.headers["x-b3-traceid"];

// Define expected trace data
const correctTraceSpansLength = 4;
const correctTraceProcessSequence = [
`lastorder-${res.data.podName.split('-')[1]}.test`, // We are sending the in-cluster event from inside the lastorder pod.
'eventing-publisher-proxy.kyma-system',
'eventing-controller.kyma-system',
`lastorder-${res.data.podName.split('-')[1]}.test`,
];

// wait sometime for jaeger to complete tracing data
await sleep(10 * 1000)
await checkTrace(traceId, correctTraceSpansLength, correctTraceProcessSequence)
}

async function checkTrace(traceId, expectedTraceLength, expectedTraceProcessSequence) {
// Port-forward to Jaeger and fetch trace data for the traceId
const cancelJaegerPortForward = await jaegerPortForward();
var traceRes;
try{
traceRes = await getJaegerTrace(traceId)
}
finally{
// finally block will run even if exception is thrown (reference: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/try...catch#the_finally-block)
cancelJaegerPortForward();
}

// the trace response should have data for single trace
expect(traceRes.data).to.have.length(1)

// Extract trace data from response
const traceData = traceRes.data[0]
expect(traceData["spans"]).to.have.length(expectedTraceLength)

// Generate DAG for trace spans
const traceDAG = await getTraceDAG(traceData)
expect(traceDAG).to.have.length(1)

// Check the tracing spans are correct
let currentSpan = traceDAG[0]
for (let i = 0; i < expectedTraceLength; i++) {
const processServiceName = traceData.processes[currentSpan.processID].serviceName;
debug(`Checking Trace Sequence # ${i}: Expected process: ${expectedTraceProcessSequence[i]}, Received process: ${processServiceName}`)
expect(processServiceName).to.be.equal(expectedTraceProcessSequence[i]);

// Traverse to next trace span
if (i < expectedTraceLength - 1) {
expect(currentSpan.childSpans).to.have.length(1)
currentSpan = currentSpan.childSpans[0]
}
}
}

async function registerAllApis(mockHost) {
debug("Listing Commerce Mock local APIs")
const localApis = await axios.get(`https://${mockHost}/local/apis`, { timeout: 5000 }).catch((err) => {
Expand Down Expand Up @@ -520,20 +611,24 @@ async function checkInClusterEventDeliveryHelper(targetNamespace, encoding) {
// send event using function query parameter send=true
await retryPromise(() => axios.post(`https://${mockHost}`, { id: eventId }, { params: { send: true, encoding: encoding } }), 10, 1000)
// verify if event was received using function query parameter inappevent=eventId
await retryPromise(async () => {
return await retryPromise(async () => {
debug("Waiting for event: ", eventId);
let response = await axios.get(`https://${mockHost}`, { params: { inappevent: eventId } })
expect(response).to.have.nested.property("data.id", eventId, "The same event id expected in the result");
expect(response).to.have.nested.property("data.shipped", true, "Order should have property shipped");
expect(response.data).to.have.nested.property("event.id", eventId, "The same event id expected in the result");
expect(response.data).to.have.nested.property("event.shipped", true, "Order should have property shipped");

return response;
}, 30, 2 * 1000);
}

module.exports = {
ensureCommerceMockLocalTestFixture,
ensureCommerceMockWithCompassTestFixture,
sendEventAndCheckResponse,
sendLegacyEventAndCheckTracing,
checkFunctionResponse,
checkInClusterEventDelivery,
checkInClusterEventTracing,
cleanMockTestFixture,
deleteMockTestFixture,
waitForSubscriptionsTillReady,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ spec:
"ce-eventtypeversion": "v1",
"ce-id": (data.id || "dummyId"),
"ce-type": type,
"Content-Type": "application/json"
"Content-Type": "application/json",
"X-B3-Sampled": 1
}
}
}
Expand All @@ -47,7 +48,8 @@ spec:
data
},
headers: {
"Content-Type": "application/cloudevents+json"
"Content-Type": "application/cloudevents+json",
"X-B3-Sampled": 1
}
}
}
Expand Down Expand Up @@ -89,18 +91,18 @@ spec:
console.dir(e)
}
} else if (event.extensions.request.query.inappevent) {
return inAppEvent[event.extensions.request.query.inappevent];
return {event:inAppEvent[event.extensions.request.query.inappevent], podName: process.env.HOSTNAME};
} else if (event["ce-type"] && event["ce-type"].includes("order.received")){
console.log("Got in-app event:", event.data);
inAppEvent[event.data.id] = { ...cloudEventHeaders(event), shipped:true, ...event.data };
inAppEvent[event.data.id] = { ...cloudEventHeaders(event), shipped:true, ...event.data, headers:event.extensions.request.headers };
} else if (event["ce-type"] && event["ce-type"].includes("order.created")) {
console.log("Order created event:", event.data)
lastEvent = { ...cloudEventHeaders(event), data: event.data };
lastEvent = { ...cloudEventHeaders(event), data: event.data, headers:event.extensions.request.headers };
}
if (event.data && event.data.orderCode) {
lastOrder = await getOrder(event.data.orderCode);
}
return {order:lastOrder, event:lastEvent};
return {order:lastOrder, event:lastEvent, podName: process.env.HOSTNAME};
}
}
---
Expand Down
26 changes: 26 additions & 0 deletions tests/fast-integration/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,31 @@ async function printEventingControllerLogs() {
}
}

// getTraceDAG returns a DAG for the provided Jaeger tracing data
async function getTraceDAG(trace) {
// Find root spans which are not child of any other span
const rootSpans = trace["spans"].filter((s) => !(s["references"].find((r) => r["refType"] === "CHILD_OF")))

// Find and attach child spans for each root span
for (const root of rootSpans) {
await attachTraceChildSpans(root, trace);
}
return rootSpans
}

// attachChildSpans finds child spans of current parentSpan and attach it to parentSpan object
// and also recursively, finds and attaches further child spans of each child.
async function attachTraceChildSpans(parentSpan, trace) {
// find child spans of current parentSpan and attach it to parentSpan object
parentSpan["childSpans"] = trace["spans"].filter((s) => s["references"].find((r) => r["refType"] === "CHILD_OF" && r["spanID"] === parentSpan["spanID"] && r["traceID"] === parentSpan["traceID"]));
// recursively, find and attach further child span of each parentSpan["childSpans"]
if (parentSpan["childSpans"] && parentSpan["childSpans"].length > 0) {
for (const child of parentSpan["childSpans"]) {
await attachTraceChildSpans(child, trace);
}
}
}

module.exports = {
initializeK8sClient,
retryPromise,
Expand Down Expand Up @@ -1599,4 +1624,5 @@ module.exports = {
printEventingPublisherProxyLogs,
createEventingBackendK8sSecret,
deleteEventingBackendK8sSecret,
getTraceDAG,
};

0 comments on commit 426d89c

Please sign in to comment.