-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lightning: remote backend #58789
base: master
Are you sure you want to change the base?
lightning: remote backend #58789
Conversation
Signed-off-by: zeminzhou <[email protected]>
Hi @zeminzhou. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Signed-off-by: zeminzhou <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #58789 +/- ##
================================================
+ Coverage 73.0885% 73.5291% +0.4405%
================================================
Files 1676 1721 +45
Lines 463643 493818 +30175
================================================
+ Hits 338870 363100 +24230
- Misses 103924 108348 +4424
- Partials 20849 22370 +1521
Flags with carried forward coverage won't be shown. Click here to find out more.
|
Please add UT for remote backend. |
@@ -433,13 +434,31 @@ func NewImportControllerWithPauser( | |||
if err != nil { | |||
return nil, err | |||
} | |||
case config.BackendRemote: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What error will occur if the user configures the remote backend? Should we prompt the user that this feature is not yet implemented? Additionally, should we add a description in the code comments to avoid confusion for users and developers?
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
/retest |
@lance6716: Cannot trigger testing until a trusted user reviews the PR and leaves an In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will review later
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
pdHTTPCli = pdhttp.NewClientWithServiceDiscovery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can add common function to create PD clients. In future we may need to change the construct arguments and forget to modify both backend
pdhttp.WithTLSConfig(tls.TLSConfig()), | ||
).WithBackoffer(retry.InitialBackoffer(time.Second, time.Second, pdutil.PDRequestRetryTime*time.Second)) | ||
|
||
encodingBuilder = local.NewEncodingBuilder(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the branch of remote backend, but it calls the local
package. It makes me think that one nature of local backend is KV encoding in lightning comparing with tidb backend. remote backend also need the local KV encoding functionality, but it has a different way to manage engines. Maybe we can reuse local backend and only add a new type of engines. I'll check this idea when reviewing rest parts
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. Notice: To remove the For example:
📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to add unit test to reveal the problems first when addressing comments
"github.com/pingcap/errors" | ||
) | ||
|
||
type chunkMeta struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call it chunk
? the name chunkMeta
makes me think it only contains metadata, but in fact it contains chunkData
usingMem bool | ||
} | ||
|
||
func newChunkCache(loadDataTaskID string, writerID uint64, basePath string, usingMem bool) (*chunkCache, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see usingMem
is decided by configuration. Can you explain how to choose the value when generating configuration? Like if source data size is less than 1GB we can use memory
return meta.chunkData, nil | ||
} | ||
|
||
path := filepath.Join(c.baseDir, fmt.Sprintf("chunk-%d", chunkID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create a helper function to encode the file path. It can be used by get
, put
and clean
|
||
func (c *chunkCache) put(chunkID uint64, buf []byte) error { | ||
if c.usingMem { | ||
c.chunks[chunkID] = chunkMeta{size: len(buf), chunkData: buf} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not consistent. You also saved the size
in put
, but it's not used in get
if _, ok := c.chunks[chunkID]; !ok { | ||
return nil | ||
} | ||
delete(c.chunks, chunkID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can directly delete
without checking existence
} | ||
} | ||
|
||
func (c *chunkSender) putEmptyChunk(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the purpose of empty chunk? Please add comments
state := c.state.Load() | ||
lastFlushedChunkID := state.FlushedChunkID + 1 | ||
for lastFlushedChunkID <= result.FlushedChunkID { | ||
err := c.chunksCache.clean(lastFlushedChunkID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIll the remote worker restarts again after we clean the chunk? Please add comments to explain the communication protocol and guarantees
err = json.Unmarshal(data, result) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should check result.Cancled
} | ||
} | ||
|
||
state := c.state.Load() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check nil
} | ||
|
||
// make sure all chunks are flushed | ||
flushedChunkID := result.FlushedChunkIDs[c.id] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the HTTP server will return a map? Why it returns the result of other writer? Please add comments
What problem does this PR solve?
Issue Number: close #xxx
Problem Summary:
What changed and how does it work?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.