-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[reconfigurator] Reject clickhouse configurations from old generations #7347
Changes from 12 commits
8138c4c
50ab66c
2b3f10f
dd0a9b1
96ac8cf
948fc93
2937b66
30155e0
608b911
0cd9654
be1afc7
6a500e6
779a549
eb69ee5
df4ba8d
c36522e
e961a60
d279711
ef04ac7
7d335b6
b6e63fa
d067a71
861875d
a75ebb9
def40c2
810e88b
c59d90f
cb21b84
cbff5cd
33ab408
b8954ab
cde78c3
089a2a4
56b1fcc
169a923
ca6b4c3
f30ed7d
cc0f280
f5f78d4
ec8c12b
5057b8e
81b564c
9875341
7a974a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,26 +3,50 @@ | |||||
// file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||||||
|
||||||
use crate::{ClickhouseCli, Clickward}; | ||||||
|
||||||
use anyhow::{anyhow, bail, Result}; | ||||||
use camino::Utf8PathBuf; | ||||||
use clickhouse_admin_types::{ | ||||||
CLICKHOUSE_KEEPER_CONFIG_DIR, CLICKHOUSE_KEEPER_CONFIG_FILE, | ||||||
CLICKHOUSE_SERVER_CONFIG_DIR, CLICKHOUSE_SERVER_CONFIG_FILE, | ||||||
}; | ||||||
use omicron_common::address::CLICKHOUSE_TCP_PORT; | ||||||
use omicron_common::api::external::Generation; | ||||||
use oximeter_db::Client as OximeterClient; | ||||||
use slog::Logger; | ||||||
use std::fs::File; | ||||||
use std::io::{BufRead, BufReader}; | ||||||
use std::net::SocketAddrV6; | ||||||
use std::str::FromStr; | ||||||
use std::sync::Arc; | ||||||
use tokio::sync::Mutex; | ||||||
|
||||||
pub struct KeeperServerContext { | ||||||
clickward: Clickward, | ||||||
clickhouse_cli: ClickhouseCli, | ||||||
log: Logger, | ||||||
pub generation: Mutex<Option<Generation>>, | ||||||
} | ||||||
|
||||||
impl KeeperServerContext { | ||||||
pub fn new(clickhouse_cli: ClickhouseCli) -> Self { | ||||||
let log = clickhouse_cli | ||||||
.log | ||||||
.new(slog::o!("component" => "KeeperServerContext")); | ||||||
pub fn new( | ||||||
log: &Logger, | ||||||
binary_path: Utf8PathBuf, | ||||||
listen_address: SocketAddrV6, | ||||||
) -> Result<Self> { | ||||||
let clickhouse_cli = | ||||||
ClickhouseCli::new(binary_path, listen_address, log); | ||||||
Comment on lines
+40
to
+46
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactor as well, there was no need to pass clickhouse_cli as a parameter, but not clickward etc. |
||||||
let log = log.new(slog::o!("component" => "KeeperServerContext")); | ||||||
let clickward = Clickward::new(); | ||||||
Self { clickward, clickhouse_cli, log } | ||||||
let config_path = Utf8PathBuf::from_str(CLICKHOUSE_KEEPER_CONFIG_DIR) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit - I think all the uses of |
||||||
.unwrap() | ||||||
.join(CLICKHOUSE_KEEPER_CONFIG_FILE); | ||||||
|
||||||
// If there is already a configuration file with a generation number we'll | ||||||
// use that. Otherwise, we set the generation number to None. | ||||||
let gen = read_generation_from_file(config_path)?; | ||||||
let generation = Mutex::new(gen); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's become practice at Oxide to avoid tokio mutexes wherever possible as they have significant problems when cancelled and generally just don't do what we want. I realize there's already some usage here with regards to initialization. We don't have to fix that in this PR, but we should avoid adding new uses. We should instead use a See the following for more details: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lol I was definitely on the fence on that one, I went for consistency in the end be1afc7#diff-c816600501b7aaa7de4a2eb9dc86498662030cea6390fa23e11a22c990efb510L28-L29 Thanks for the links! Hadn't seen those RFDs, will read them both |
||||||
Ok(Self { clickward, clickhouse_cli, log, generation }) | ||||||
} | ||||||
|
||||||
pub fn clickward(&self) -> &Clickward { | ||||||
|
@@ -36,6 +60,10 @@ impl KeeperServerContext { | |||||
pub fn log(&self) -> &Logger { | ||||||
&self.log | ||||||
} | ||||||
|
||||||
pub async fn generation(&self) -> Option<Generation> { | ||||||
*self.generation.lock().await | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only need read access here, and so we can easily avoid an async mutex here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I was wrong here. I wasn't considering the usage of the generation with regards to concurrent requests. |
||||||
} | ||||||
} | ||||||
|
||||||
pub struct ServerContext { | ||||||
|
@@ -44,24 +72,40 @@ pub struct ServerContext { | |||||
oximeter_client: OximeterClient, | ||||||
initialization_lock: Arc<Mutex<()>>, | ||||||
log: Logger, | ||||||
pub generation: Mutex<Option<Generation>>, | ||||||
} | ||||||
|
||||||
impl ServerContext { | ||||||
pub fn new(clickhouse_cli: ClickhouseCli) -> Self { | ||||||
let ip = clickhouse_cli.listen_address.ip(); | ||||||
pub fn new( | ||||||
log: &Logger, | ||||||
binary_path: Utf8PathBuf, | ||||||
listen_address: SocketAddrV6, | ||||||
) -> Result<Self> { | ||||||
let clickhouse_cli = | ||||||
ClickhouseCli::new(binary_path, listen_address, log); | ||||||
|
||||||
let ip = listen_address.ip(); | ||||||
let address = SocketAddrV6::new(*ip, CLICKHOUSE_TCP_PORT, 0, 0); | ||||||
let oximeter_client = | ||||||
OximeterClient::new(address.into(), &clickhouse_cli.log); | ||||||
let oximeter_client = OximeterClient::new(address.into(), log); | ||||||
let clickward = Clickward::new(); | ||||||
let log = | ||||||
clickhouse_cli.log.new(slog::o!("component" => "ServerContext")); | ||||||
Self { | ||||||
let log = log.new(slog::o!("component" => "ServerContext")); | ||||||
|
||||||
let config_path = Utf8PathBuf::from_str(CLICKHOUSE_SERVER_CONFIG_DIR) | ||||||
.unwrap() | ||||||
.join(CLICKHOUSE_SERVER_CONFIG_FILE); | ||||||
|
||||||
// If there is already a configuration file with a generation number we'll | ||||||
// use that. Otherwise, we set the generation number to None. | ||||||
let gen = read_generation_from_file(config_path)?; | ||||||
let generation = Mutex::new(gen); | ||||||
Ok(Self { | ||||||
clickhouse_cli, | ||||||
clickward, | ||||||
oximeter_client, | ||||||
initialization_lock: Arc::new(Mutex::new(())), | ||||||
log, | ||||||
} | ||||||
generation, | ||||||
}) | ||||||
} | ||||||
|
||||||
pub fn clickhouse_cli(&self) -> &ClickhouseCli { | ||||||
|
@@ -83,4 +127,180 @@ impl ServerContext { | |||||
pub fn log(&self) -> &Logger { | ||||||
&self.log | ||||||
} | ||||||
|
||||||
pub async fn generation(&self) -> Option<Generation> { | ||||||
*self.generation.lock().await | ||||||
} | ||||||
} | ||||||
|
||||||
pub struct SingleServerContext { | ||||||
clickhouse_cli: ClickhouseCli, | ||||||
oximeter_client: OximeterClient, | ||||||
initialization_lock: Arc<Mutex<()>>, | ||||||
log: Logger, | ||||||
} | ||||||
|
||||||
impl SingleServerContext { | ||||||
pub fn new( | ||||||
log: &Logger, | ||||||
binary_path: Utf8PathBuf, | ||||||
listen_address: SocketAddrV6, | ||||||
) -> Self { | ||||||
let clickhouse_cli = | ||||||
ClickhouseCli::new(binary_path, listen_address, log); | ||||||
|
||||||
let ip = listen_address.ip(); | ||||||
let address = SocketAddrV6::new(*ip, CLICKHOUSE_TCP_PORT, 0, 0); | ||||||
let oximeter_client = OximeterClient::new(address.into(), log); | ||||||
let log = | ||||||
clickhouse_cli.log.new(slog::o!("component" => "ServerContext")); | ||||||
|
||||||
Self { | ||||||
clickhouse_cli, | ||||||
oximeter_client, | ||||||
initialization_lock: Arc::new(Mutex::new(())), | ||||||
log, | ||||||
} | ||||||
} | ||||||
|
||||||
pub fn clickhouse_cli(&self) -> &ClickhouseCli { | ||||||
&self.clickhouse_cli | ||||||
} | ||||||
|
||||||
pub fn oximeter_client(&self) -> &OximeterClient { | ||||||
&self.oximeter_client | ||||||
} | ||||||
|
||||||
pub fn initialization_lock(&self) -> Arc<Mutex<()>> { | ||||||
self.initialization_lock.clone() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this usage of a tokio lock is safe or not due to cancellation. It looks like it aligns with the exact usage we have in our @sunshowers @jgallagher Do you have any ideas here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Various thoughts; sorry if some of this is obvious, but I don't have much context here so am just hopping in:
On the last point: I think this is "fine" as long as dropshot is configured correctly (i.e., to not cancel handlers). If we wanted this to be correct even under cancellation, I'd probably move the init process into a separate tokio task and manage that either with channels or a sync mutex. Happy to expand on those ideas if it'd be helpful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the input!
Tbh, I'm just moving code around that was already here. I'm not really sure what the intention was initially.
That sounds like a good idea regardless of what the initial intention was. Do you mind expanding a little on those ideas? It'd definitely be helpful There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure thing! One pattern we've used in a bunch places is to spawn a long-lived tokio task and then communicate with it via channels. This looks something like (untested and lots of details omitted): // kinds of things we can ask the task to do
enum Request {
DoSomeThing {
// any inputs from us the task needs
data: DataNeededToDoSomeThing,
// a oneshot channel the task uses to send us the result of our request
response: oneshot::Sender<ResultOfSomeThing>,
},
}
// the long-lived task: loop over incoming requests and handle them
fn long_running_task(incoming: Receiver<Request>) {
// run until the sending half of `incoming` is dropped
while let Some(request) = incoming.recv().await {
match request {
Request::DoSomeThing { data, response } => {
let result = do_some_thing(data);
response.send(response);
}
}
}
}
// our main code: one time up front, create the channel we use to talk to the inner task and spawn that task
let (inner_tx, inner_rx) = mpsc::channel(N); // picking N here can be hard
let join_handle = tokio::spawn(long_running_task(inner_rx));
// ... somewhere else, when we want the task to do something for us ...
let (response_tx, response_rx) = oneshot::channel();
inner_tx.send(Request::DoSomeThing { data, response_tx });
let result = response_rx.await; A real example of this pattern (albeit more complex; I'm not finding any super simple ones at the moment) is in the bootstrap agent: here's where we spawn the inner task. It has a couple different channels for incoming requests, so its run loop is a tokio::select over those channels but is otherwise pretty similar to the outline above. This pattern is nice because regardless of how many concurrent callers try to send messages to the inner task, it itself can do things serially. In my pseudocode above, if the I really like this pattern. But it has some problems:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A "build your own" variant of the above in the case where you want at most one instance of some operation is to use a // one time up front, create a sync mutex around an optional tokio task join handle
let task_lock = sync::Mutex::new(None);
// ... somewhere else, where we want to do work ...
// acquire the lock
let mut task_lock = task_lock.lock().unwrap();
// if there's a previous task running, is it still running?
let still_running = match task_lock.as_ref() {
Some(joinhandle) => !joinhandle.is_finished(),
None => false,
};
if still_running {
// return a "we're busy" error
}
// any previous task is done; start a new one
*task_lock = Some(tokio::spawn(do_some_work())); This has its own problems; the biggest one is that we can't wait for the result of We don't use this pattern as much. One example is in installinator, where we do want to get the result of previously-completed tasks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the write up John. I think, overall, it's probably simpler to have a long running task and issue requests that way. However, as you mentioned this has its own problems. However, we know what those problems are and we use this pattern all over sled agent. In this case we can constraint the problem such that we only want to handle one in flight request at a time, since reconfigurator execution will retry again later anyway. I'd suggest using a flume bounded channel with a size of 0 to act as a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. excellent! Thanks a bunch for the write up! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@andrewjstone , do we really not want them to block out each other? It'd be problematic to have the db init job trying to run when the generate config one hasn't finished and vice versa no? |
||||||
} | ||||||
|
||||||
pub fn log(&self) -> &Logger { | ||||||
&self.log | ||||||
} | ||||||
} | ||||||
|
||||||
fn read_generation_from_file(path: Utf8PathBuf) -> Result<Option<Generation>> { | ||||||
// When the configuration file does not exist yet, this means it's a new server. | ||||||
// It won't have a running clickhouse server and no generation number yet. | ||||||
if !path.exists() { | ||||||
return Ok(None); | ||||||
} | ||||||
|
||||||
let file = File::open(&path)?; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add context to this error? Something like
Suggested change
|
||||||
let reader = BufReader::new(file); | ||||||
// We know the generation number is on the top of the file so we only | ||||||
// need the first line. | ||||||
let first_line = match reader.lines().next() { | ||||||
Some(g) => g?, | ||||||
// When the clickhouse configuration file exists but has no contents, | ||||||
// it means something went wrong when creating the file earlier. | ||||||
// We should return because something is definitely broken. | ||||||
None => bail!( | ||||||
"clickhouse configuration file exists at {}, but is empty", | ||||||
path | ||||||
), | ||||||
}; | ||||||
|
||||||
let line_parts: Vec<&str> = first_line.rsplit(':').collect(); | ||||||
if line_parts.len() != 2 { | ||||||
bail!("first line of configuration file is malformed: {}", first_line); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add |
||||||
} | ||||||
|
||||||
// It's safe to unwrap since we already know `line_parts` contains two items. | ||||||
let line_end_part: Vec<&str> = | ||||||
line_parts.first().unwrap().split_terminator(" -->").collect(); | ||||||
if line_end_part.len() != 1 { | ||||||
bail!("first line of configuration file is malformed: {}", first_line); | ||||||
} | ||||||
|
||||||
// It's safe to unwrap since we already know `line_end_part` contains an item. | ||||||
let gen_u64: u64 = line_end_part.first().unwrap().parse().map_err(|e| { | ||||||
anyhow!( | ||||||
concat!( | ||||||
"first line of configuration file is malformed: {}; ", | ||||||
"error = {}", | ||||||
), | ||||||
first_line, | ||||||
e | ||||||
) | ||||||
})?; | ||||||
|
||||||
let gen = Generation::try_from(gen_u64)?; | ||||||
|
||||||
Ok(Some(gen)) | ||||||
} | ||||||
|
||||||
#[cfg(test)] | ||||||
mod tests { | ||||||
use super::read_generation_from_file; | ||||||
use camino::Utf8PathBuf; | ||||||
use clickhouse_admin_types::CLICKHOUSE_SERVER_CONFIG_FILE; | ||||||
use omicron_common::api::external::Generation; | ||||||
use std::str::FromStr; | ||||||
|
||||||
#[test] | ||||||
fn test_read_generation_from_file_success() { | ||||||
let dir = Utf8PathBuf::from_str("types/testutils") | ||||||
.unwrap() | ||||||
.join(CLICKHOUSE_SERVER_CONFIG_FILE); | ||||||
let generation = read_generation_from_file(dir).unwrap().unwrap(); | ||||||
|
||||||
assert_eq!(Generation::from(1), generation); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn test_read_generation_from_file_none() { | ||||||
let dir = Utf8PathBuf::from_str("types/testutils") | ||||||
.unwrap() | ||||||
.join("i-dont-exist.xml"); | ||||||
let generation = read_generation_from_file(dir).unwrap(); | ||||||
|
||||||
assert_eq!(None, generation); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn test_read_generation_from_file_malformed_1() { | ||||||
let dir = Utf8PathBuf::from_str("types/testutils") | ||||||
.unwrap() | ||||||
.join("malformed_1.xml"); | ||||||
let result = read_generation_from_file(dir); | ||||||
let error = result.unwrap_err(); | ||||||
let root_cause = error.root_cause(); | ||||||
|
||||||
assert_eq!( | ||||||
format!("{}", root_cause), | ||||||
"first line of configuration file is malformed: <clickhouse>" | ||||||
); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn test_read_generation_from_file_malformed_2() { | ||||||
let dir = Utf8PathBuf::from_str("types/testutils") | ||||||
.unwrap() | ||||||
.join("malformed_2.xml"); | ||||||
let result = read_generation_from_file(dir); | ||||||
let error = result.unwrap_err(); | ||||||
let root_cause = error.root_cause(); | ||||||
|
||||||
assert_eq!( | ||||||
format!("{}", root_cause), | ||||||
"first line of configuration file is malformed: <!-- generation:bob -->; error = invalid digit found in string" | ||||||
); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn test_read_generation_from_file_malformed_3() { | ||||||
let dir = Utf8PathBuf::from_str("types/testutils") | ||||||
.unwrap() | ||||||
.join("malformed_3.xml"); | ||||||
let result = read_generation_from_file(dir); | ||||||
let error = result.unwrap_err(); | ||||||
let root_cause = error.root_cause(); | ||||||
|
||||||
assert_eq!( | ||||||
format!("{}", root_cause), | ||||||
"first line of configuration file is malformed: <!-- generation:2 --> -->" | ||||||
); | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the refactoring, the logs were a bit of a mess.