Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Disk writer cannot send remaining data after etcd shutdown #2228

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

Matovidlo
Copy link
Contributor

@Matovidlo Matovidlo commented Jan 30, 2025

Jira: PSGO-993

Changes:

  • Delete after write does not happens in watch module.
    • Statistics seems to be working correctly. PSGO-810
    • Diskwriter seems to be working correctly. PSGO-993
    • Probablly solves this issue as well PSGO-830
    • Probablly solves this issue as well PSGO-855

Issues

  • Removed sorting in ETCD transaction/compact operation to prevent Delete after Create, test included.
  • Do not change distribution nodes when no changes were made
  • Do not change volumes within connection when no changes were made. Closing and recreating of transport is extensive operation, we save some CPU/MEM during ETCD shutdown.
  • Open connection in dialer section. Should prevent issues that connection.volume has no old reference but always up to date reference from set of connection volumes
  • One small race condition on waitgroup. Use w.file.Sync() instead of Sync() method in Close as waitgroup is not longer needed during close of writer

@Matovidlo Matovidlo force-pushed the PSGO-993-disk-writer-cannot-send-remaining-data-during-etcd-shutdown branch 4 times, most recently from 75c7194 to 1198d92 Compare January 30, 2025 10:17
if len(n.duplicateNodes) > len(n.sameNodes) {
return Events{}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss the comparison of map keys duplicate vs same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the IDs not important?

Copy link
Contributor Author

@Matovidlo Matovidlo Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because they are present in map[key].

Copy link

Stream Kubernetes Diff [CI]

Between base 2288780 ⬅️ head ea266b1.

Expand
--- /tmp/artifacts/test-k8s-state.old.json.processed.kv	2025-01-30 10:37:26.969272334 +0000
+++ /tmp/artifacts/test-k8s-state.new.json.processed.kv	2025-01-30 10:37:27.356272990 +0000
@@ -200 +200 @@
-<Deployment/stream-api>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Deployment/stream-api>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -364 +364 @@
-<Deployment/stream-http-source>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Deployment/stream-http-source>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -525 +525 @@
-<Deployment/stream-storage-coordinator>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Deployment/stream-storage-coordinator>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -602 +602 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[0].hostname = "stream-etcd-1";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[0].hostname = "stream-etcd-2";
@@ -606 +606 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[0].targetRef.name = "stream-etcd-1";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[0].targetRef.name = "stream-etcd-2";
@@ -609 +609 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[1].hostname = "stream-etcd-0";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[1].hostname = "stream-etcd-1";
@@ -613 +613 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[1].targetRef.name = "stream-etcd-0";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[1].targetRef.name = "stream-etcd-1";
@@ -616 +616 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[2].hostname = "stream-etcd-2";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[2].hostname = "stream-etcd-0";
@@ -620 +620 @@
-<Endpoints/stream-etcd-headless>.subsets[0].addresses[2].targetRef.name = "stream-etcd-2";
+<Endpoints/stream-etcd-headless>.subsets[0].addresses[2].targetRef.name = "stream-etcd-0";
@@ -653 +653 @@
-<Endpoints/stream-etcd>.subsets[0].addresses[0].targetRef.name = "stream-etcd-1";
+<Endpoints/stream-etcd>.subsets[0].addresses[0].targetRef.name = "stream-etcd-2";
@@ -659 +659 @@
-<Endpoints/stream-etcd>.subsets[0].addresses[1].targetRef.name = "stream-etcd-0";
+<Endpoints/stream-etcd>.subsets[0].addresses[1].targetRef.name = "stream-etcd-1";
@@ -665 +665 @@
-<Endpoints/stream-etcd>.subsets[0].addresses[2].targetRef.name = "stream-etcd-2";
+<Endpoints/stream-etcd>.subsets[0].addresses[2].targetRef.name = "stream-etcd-0";
@@ -717 +717 @@
-<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[0].hostname = "stream-storage-writer-reader-0";
+<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[0].hostname = "stream-storage-writer-reader-1";
@@ -721 +721 @@
-<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[0].targetRef.name = "stream-storage-writer-reader-0";
+<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[0].targetRef.name = "stream-storage-writer-reader-1";
@@ -724 +724 @@
-<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[1].hostname = "stream-storage-writer-reader-1";
+<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[1].hostname = "stream-storage-writer-reader-0";
@@ -728 +728 @@
-<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[1].targetRef.name = "stream-storage-writer-reader-1";
+<Endpoints/stream-storage-writer-reader>.subsets[0].addresses[1].targetRef.name = "stream-storage-writer-reader-0";
@@ -1214,2 +1214,2 @@
-<Pod/stream-api-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
-<Pod/stream-api-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Pod/stream-api-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
+<Pod/stream-api-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -1534 +1534 @@
-<Pod/stream-etcd-0>.spec.containers[0].env[21].value = "new";
+<Pod/stream-etcd-0>.spec.containers[0].env[21].value = "existing";
@@ -1780 +1780 @@
-<Pod/stream-etcd-1>.spec.containers[0].env[21].value = "new";
+<Pod/stream-etcd-1>.spec.containers[0].env[21].value = "existing";
@@ -2026 +2026 @@
-<Pod/stream-etcd-2>.spec.containers[0].env[21].value = "new";
+<Pod/stream-etcd-2>.spec.containers[0].env[21].value = "existing";
@@ -2350,2 +2350,2 @@
-<Pod/stream-http-source-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
-<Pod/stream-http-source-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Pod/stream-http-source-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
+<Pod/stream-http-source-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -2742,2 +2742,2 @@
-<Pod/stream-storage-coordinator-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
-<Pod/stream-storage-coordinator-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Pod/stream-storage-coordinator-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
+<Pod/stream-storage-coordinator-<hash>>.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -2978 +2978 @@
-<Pod/stream-storage-writer-reader-0>.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Pod/stream-storage-writer-reader-0>.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -3061 +3061 @@
-<Pod/stream-storage-writer-reader-0>.spec.containers[1].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Pod/stream-storage-writer-reader-0>.spec.containers[1].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -3231 +3231 @@
-<Pod/stream-storage-writer-reader-1>.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Pod/stream-storage-writer-reader-1>.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -3314 +3314 @@
-<Pod/stream-storage-writer-reader-1>.spec.containers[1].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<Pod/stream-storage-writer-reader-1>.spec.containers[1].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -3552 +3552 @@
-<ReplicaSet/stream-api-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<ReplicaSet/stream-api-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -3723 +3723 @@
-<ReplicaSet/stream-http-source-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<ReplicaSet/stream-http-source-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -3891 +3891 @@
-<ReplicaSet/stream-storage-coordinator-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<ReplicaSet/stream-storage-coordinator-<hash>>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -3932,0 +3933,12 @@
+<Secret/sh.helm.release.v1.stream-etcd.v2> = {};
+<Secret/sh.helm.release.v1.stream-etcd.v2>.apiVersion = "v1";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.data = {};
+<Secret/sh.helm.release.v1.stream-etcd.v2>.kind = "Secret";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata = {};
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.labels = {};
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.labels.name = "stream-etcd";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.labels.owner = "helm";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.labels.version = "2";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.name = "sh.helm.release.v1.stream-etcd.v2";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.metadata.namespace = "stream";
+<Secret/sh.helm.release.v1.stream-etcd.v2>.type = "helm.sh/release.v1";
@@ -4228 +4240 @@
-<StatefulSet/stream-etcd>.spec.template.spec.containers[0].env[21].value = "new";
+<StatefulSet/stream-etcd>.spec.template.spec.containers[0].env[21].value = "existing";
@@ -4478 +4490 @@
-<StatefulSet/stream-storage-writer-reader>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<StatefulSet/stream-storage-writer-reader>.spec.template.spec.containers[0].image = "docker.io/keboola/stream-api:1198d92-1738232846";
@@ -4558 +4570 @@
-<StatefulSet/stream-storage-writer-reader>.spec.template.spec.containers[1].image = "docker.io/keboola/stream-api:2288780-1738232292";
+<StatefulSet/stream-storage-writer-reader>.spec.template.spec.containers[1].image = "docker.io/keboola/stream-api:1198d92-1738232846";


(see artifacts in the Github Action for more information)

@Matovidlo Matovidlo changed the title fix: Cisk writer cannot send remaining data after etcd shutdown fix: Disk writer cannot send remaining data after etcd shutdown Jan 30, 2025
@Matovidlo Matovidlo force-pushed the PSGO-993-disk-writer-cannot-send-remaining-data-during-etcd-shutdown branch 2 times, most recently from 64bef68 to 34a1ff2 Compare February 3, 2025 13:00
@Matovidlo Matovidlo marked this pull request as ready for review February 5, 2025 08:21
Reason: It caused issues with missing entries that were actually
dispatched but deleted afterwards using `watch` feature of ETCD
prevents disconnect during writing into open slice. Also does not cause
closing of the slice
… slice.

Reason: During disconnect phase we lost the dial up of the connection.
We need to check volume connection always during dial up phase.
@Matovidlo Matovidlo force-pushed the PSGO-993-disk-writer-cannot-send-remaining-data-during-etcd-shutdown branch from fc6d2c8 to 62f7d33 Compare February 6, 2025 09:24
Waitgroup during `Close()` is not needed
Copy link
Contributor

@jachym-tousek-keboola jachym-tousek-keboola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I feel like this PR is too big. Not code wise, each of the changes are relatively small. But they're difficult to understand and not very related so it gets a bit confusing when they're mixed up in one PR like this. It would be better if this PR was split in my opinion so that we can discuss each fix separately.

Comment on lines 255 to 258
// Make sure that nodes are updated only when there is maximum nodes
if len(n.duplicateNodes) > len(n.sameNodes) {
return Events{}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I don't understand this. If I understand correctly from the call you want to suppress the events if a node was both deleted and added, right?

Isn't this too aggressive then? It looks like you suppress all events if there are fewer of them than previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is only when you downsize the cluster e.g from 4 nodes -> 2 nodes. It requires restart of all services. It cannot be done otherwise. I will try to write a test that shows that but cannot do it with any other approach.
I think this is very important topic that you need to understand in depth (ETCD mirror, watch operation and restarts of ETCD) otherwise the PRs and CRs will not make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if len(n.duplicateNodes) > len(n.sameNodes) {
return Events{}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the IDs not important?

Comment on lines +104 to +105
sameNodes: make(map[string]nodeChange),
duplicateNodes: make(map[string]nodeChange),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we improve distribution/node_test.go to test these changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +151 to +153
if len(m.oldVolumes) > len(activeNodes) {
activeNodes = m.oldVolumes
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly as the distribution, I don't understand this. I would expect != but > is surprising.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to encourage you to test it on your behalf instead of asking.
Simply the operations in watchMirror are streamlined 1 by 1. So if I restart a watch it goes like this

DEL vol1 modRev 3
DEL vol2 modeRev 4
PUT vol1 modRev 5
PUT vol2 modRev 6

So based on your comparison I would effectively go through rest of code.
This code has one issue and it is in downsizing the number of volumes. E.g 2->1. It requires a restart. I think it cannot be safelly done only by restarting whole deployment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to encourage you to test it on your behalf instead of asking.

I don't know what the scenario is.

Anyway, I have been looking at the code for a while and comparing it with the old code and your changes still don't make any sense to me... We have a mirrored map of volumes and when they change we need to open connections to the new volumes and close the connections to the old volumes. right? At least that's what the old code did.

Now if I understand the code correctly updateConnections is called on every revision, right? And the goal is to prevent disconnecting a node if it is readded soon after? And it also looks like your code assumes that each revision has a maximum of one volume change? How is that guaranteed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recap from calls: The relevant deletes and puts seem to be within a single revision. The code in distribution/node.go processed every event independently so it wasn't able to catch no-ops when a DELETE and PUT appeared together. In connection.go however this is already the case due to how MirrorMap works internally - the WithOnUpdate callback is only called once at the end of the revision instead of for each event independently. This is evident when comparing WithForEach and for ... range events { in node.go vs watch_mirror_map.go. Because of that I believe that we actually only need the distribution changes but not the connection changes.

Martin said that the distribution changes alone were not sufficient though so he'll repeat the testing scenario to gather more information what was wrong in that case.

Comment on lines +156 to +157
m.logger.Infof(ctx, "no changes in volumes, skipping connection update")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect some new test that would expect this log line.

Copy link
Contributor Author

@Matovidlo Matovidlo Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please help me with that and adjust the test based on this branch. I have 4 more issues on table

Comment on lines +147 to +150
if s.Code() == codes.Unavailable && errors.Is(s.Err(), io.EOF) {
onServerTermination(ctx, "remote server shutdown")
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this block please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to go throughout how gRPC and network things works. I think that's where we still have problem in streaming at fast pace and this will be probably reworked if we want to achieve higher throughput

Comment on lines +166 to +172
errs := errors.NewMultiError()
if w.isClosed() {
return errors.New(`writer is already closed`)
}
close(w.closed)

errs := errors.NewMultiError()

// Wait for running writes
close(w.closed)
Copy link
Contributor

@jachym-tousek-keboola jachym-tousek-keboola Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking closer at this, I think these changes don't do anything? Why would it matter when exactly is errs := errors.NewMultiError() created? You aren't using it sooner or anything... I'd recommend ammending the commit to remove this change as it's just confusing but doesn't do anything as far as I can tell. I assume you were trying something that you didn't need in the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am more convinient with variable declaration at top of method instead of doing that throughout the function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not since this way the variable is declared even when not needed (when the writer is closed).

@Matovidlo
Copy link
Contributor Author

@jachym-tousek-keboola I understand your concert about PR but this PR fixes 4 related things on Disk Writer communication. It is a set of maybe unrelated things but left overs that were not tested. I think that there is no real reason to split this because all 4 issues have to be address otherwise the disk writer is not working properly.

@Matovidlo Matovidlo marked this pull request as draft February 11, 2025 13:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants