Skip to content

Commit

Permalink
Merge pull request #424 from weibocom/dev_support_fnv1_and_new_ketama
Browse files Browse the repository at this point in the history
支持新hash、dist
  • Loading branch information
hustfisher authored Mar 15, 2024
2 parents a5596c3 + 096a6d9 commit f7b6e75
Show file tree
Hide file tree
Showing 10 changed files with 587 additions and 8 deletions.
40 changes: 37 additions & 3 deletions endpoint/src/redisservice/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//use ds::time::Duration;

use std::fmt::Debug;
use std::{collections::HashSet, fmt::Debug};

use serde::{Deserialize, Serialize};
//use sharding::distribution::{DIST_ABS_MODULA, DIST_MODULA};
Expand All @@ -14,6 +14,9 @@ const NO_CHECK_SUFFIX: &str = "-nocheck";
pub struct RedisNamespace {
pub(crate) basic: Basic,
pub(crate) backends: Vec<String>,
// 对于一致性hash,为了确保ip变化后,分片不变,一般会为每组分片取一个name,来确定分片的hash始终固定
#[serde(default)]
pub(crate) backend_names: Vec<String>,
}

#[derive(Debug, Clone, Default, Deserialize, Serialize)]
Expand Down Expand Up @@ -51,8 +54,27 @@ impl RedisNamespace {
return None;
}

// check backends,分离出names
let mut backends = Vec::with_capacity(ns.backends.len());
for b in &mut ns.backends {
let domain_name: Vec<&str> = b.split(" ").collect();
// 后端地址格式: 域名,域名 name, name不能是rm、rs、','开头,避免把异常格式的slave当作name
if domain_name.len() == 2
&& !domain_name[1].starts_with("rm")
&& !domain_name[1].starts_with("rs")
&& !domain_name[1].starts_with(",")
{
backends.push(domain_name[0].to_string());
ns.backend_names.push(domain_name[1].to_string());
}
}
if backends.len() > 0 {
ns.backends = backends;
log::info!("+++ found redis backends with name: {}", cfg);
}

if !ns.validate_and_correct() {
log::error!("shards {} is not power of two: {}", ns.backends.len(), cfg);
log::error!("malformed names or shards {}: {}", ns.backends.len(), cfg);
return None;
}

Expand Down Expand Up @@ -86,8 +108,8 @@ impl RedisNamespace {
#[inline(always)]
fn validate_and_correct(&mut self) -> bool {
let dist = &self.basic.distribution;
// 需要检测dist时(默认场景),对于range/modrange类型的dist需要限制后端数量为2^n

// 需要检测dist时(默认场景),对于range/modrange类型的dist需要限制后端数量为2^n
if dist.starts_with(sharding::distribution::DIST_RANGE)
|| dist.starts_with(sharding::distribution::DIST_MOD_RANGE)
{
Expand All @@ -103,6 +125,18 @@ impl RedisNamespace {
}
}

// 如果backend有name,则所有的后端都必须有name,且name不能重复
if self.backend_names.len() > 0 {
if self.backend_names.len() != self.backends.len() {
return false;
}
let mut names_unique = HashSet::with_capacity(self.backend_names.len());
names_unique.extend(self.backend_names.clone());
if names_unique.len() != self.backend_names.len() {
return false;
}
}

true
}
}
7 changes: 6 additions & 1 deletion endpoint/src/redisservice/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ where
fn update(&mut self, namespace: &str, cfg: &str) {
if let Some(ns) = RedisNamespace::try_from(cfg) {
self.hasher = Hasher::from(&ns.basic.hash);
self.distribute = Distribute::from(ns.basic.distribution.as_str(), &ns.backends);
let backends = match ns.backend_names.len() {
0 => &ns.backends,
_ => &ns.backend_names,
};
log::debug!("+++ dist with backends:{:?}", backends);
self.distribute = Distribute::from(ns.basic.distribution.as_str(), backends);
self.cfg.update(namespace, ns);
}
}
Expand Down
15 changes: 12 additions & 3 deletions sharding/src/distribution/consistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,36 @@ impl Consistent {
}

pub fn from<T: Deref<Target = str>>(shards: &[T]) -> Self {
Consistent::new(shards, false)
}

pub fn new<T: Deref<Target = str>>(shards: &[T], origin_alg: bool) -> Self {
log::debug!("+++ use ketama with origin:{}", origin_alg);
let mut map = BTreeMap::default();
for idx in 0..shards.len() {
let factor = 40;
for i in 0..factor {
let data: String = shards[idx].to_string() + "-" + &i.to_string();
let out_bytes = md5::compute(data.as_str());
for j in 0..4 {
let hash = (((out_bytes[3 + j * 4] & 0xFF) as i64) << 24)
let mut hash = (((out_bytes[3 + j * 4] & 0xFF) as i64) << 24)
| (((out_bytes[2 + j * 4] & 0xFF) as i64) << 16)
| (((out_bytes[1 + j * 4] & 0xFF) as i64) << 8)
| ((out_bytes[0 + j * 4] & 0xFF) as i64);

let mut hash = hash.wrapping_rem(i32::MAX as i64);
// twemproxy 为代表的原始版ketama算法,不需要此计算, 但业务sdk的修正版需要此计算
if !origin_alg {
hash = hash.wrapping_rem(i32::MAX as i64);
}

if hash < 0 {
hash = hash.wrapping_mul(-1);
}

map.insert(hash, idx);
}
}
}

Self { buckets: map }
}
}
1 change: 1 addition & 0 deletions sharding/src/distribution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl Distribute {
"modula" => Self::Modula(Modula::from(names.len(), false)),
"absmodula" => Self::Modula(Modula::from(names.len(), true)),
"ketama" => Self::Consistent(Consistent::from(names)),
"ketama_origin" => Self::Consistent(Consistent::new(names, true)),
"range" => Self::Range(Range::from(num, names.len())),
"modrange" => Self::ModRange(ModRange::from(num, names.len())),
"splitmod" => Self::SplitMod(SplitMod::from(num, names.len())),
Expand Down
19 changes: 19 additions & 0 deletions sharding/src/hash/fnv1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/// 按需支持fnv1系列所有相关的hash算法,目前支持fnv1a_64
#[derive(Debug, Default, Clone)]
pub struct Fnv1aF64;

const FNV_64_INIT: u64 = 0xcbf29ce484222325;
const FNV_64_PRIME: u64 = 0x100000001b3;

impl super::Hash for Fnv1aF64 {
fn hash<S: crate::HashKey>(&self, key: &S) -> i64 {
let mut hash = FNV_64_INIT as u32;
for i in 0..key.len() {
hash ^= key.at(i) as u32;
hash = hash.wrapping_mul(FNV_64_PRIME as u32);
}

hash as i64
}
}
5 changes: 4 additions & 1 deletion sharding/src/hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod bkdrsub;
pub mod crc32;
pub mod crc32local;
pub mod crc64;
pub mod fnv1;
pub mod lbcrc32local;
pub mod padding;
pub mod random;
Expand All @@ -26,7 +27,7 @@ pub mod crc;

use enum_dispatch::enum_dispatch;

use self::{bkdrsub::Bkdrsub, crc64::Crc64};
use self::{bkdrsub::Bkdrsub, crc64::Crc64, fnv1::Fnv1aF64};

// 占位hash,主要用于兼容服务框架,供mq等业务使用
pub const HASH_PADDING: &str = "padding";
Expand Down Expand Up @@ -84,6 +85,7 @@ pub enum Hasher {
Rawcrc32local(Rawcrc32local), // raw or crc32local
Crc32Abs(Crc32Abs), // crc32abs: 基于i32转换,然后直接取abs;其他走i64提升为正数
Crc64(Crc64), // Crc64 算法,对整个key做crc64计算
Fnv1aF64(Fnv1aF64),
Random(RandomHash), // random hash
RawSuffix(RawSuffix),
}
Expand Down Expand Up @@ -130,6 +132,7 @@ impl Hasher {
"crc32abs" => Self::Crc32Abs(Default::default()),
"crc64" => Self::Crc64(Default::default()),
"random" => Self::Random(Default::default()),
"fnv1a_64" => Self::Fnv1aF64(Default::default()),
_ => {
// 默认采用mc的crc32-s hash
log::error!("found unknown hash:{}, use crc32-short instead", alg);
Expand Down
Loading

0 comments on commit f7b6e75

Please sign in to comment.