Skip to content

Commit

Permalink
Merge pull request #83 from weibocom/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
FicoHu authored Mar 11, 2022
2 parents 656e4dd + 078cef4 commit 53e1740
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 152 deletions.
47 changes: 0 additions & 47 deletions discovery/src/resource/mod.rs

This file was deleted.

61 changes: 0 additions & 61 deletions discovery/src/resource/redis.rs

This file was deleted.

59 changes: 27 additions & 32 deletions protocol/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl CallbackContext {
}
#[inline]
pub fn response_ok(&self) -> bool {
unsafe { self.inited() && self.response().ok() }
unsafe { self.inited() && self.unchecked_response().ok() }
}
#[inline]
pub fn on_err(&mut self, err: Error) {
Expand All @@ -138,7 +138,7 @@ impl CallbackContext {
}
// 在使用前,先得判断inited
#[inline]
pub unsafe fn response(&self) -> &Command {
pub unsafe fn unchecked_response(&self) -> &Command {
assert!(self.inited());
self.response.assume_init_ref()
}
Expand Down Expand Up @@ -209,7 +209,7 @@ impl CallbackContext {
#[inline]
fn try_drop_response(&mut self) {
if self.ctx.is_inited() {
log::debug!("drop response:{}", unsafe { self.response() });
log::debug!("drop response:{}", unsafe { self.unchecked_response() });
self.ctx
.inited
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
Expand Down Expand Up @@ -302,7 +302,7 @@ impl Display for Context {
write!(
f,
"complete:{} init:{},async :{} try:{} write back:{} context:{}",
self.complete.load(Ordering::Relaxed),
self.complete.load(Ordering::Acquire),
self.is_inited(),
self.drop_on_done(),
self.try_next,
Expand All @@ -312,31 +312,33 @@ impl Display for Context {
}
}

use std::ptr::NonNull;
pub struct CallbackContextPtr {
inner: NonNull<CallbackContext>,
ptr: *mut CallbackContext,
}

impl CallbackContextPtr {
#[inline]
pub fn build_request(&mut self) -> Request {
unsafe { Request::new(self.inner.as_mut()) }
Request::new(self.ptr)
}
//需要在on_done时主动销毁self对象
#[inline]
pub fn async_start_write_back<P: crate::Protocol>(mut self, parser: &P) {
assert!(self.inited());
assert!(self.complete());
let exp = unsafe { self.inner.as_ref().callback.exp_sec() };
if let Some(new) = parser.build_writeback_request(&mut self, exp) {
self.with_request(new);
}
// 还会有异步请求,内存释放交给异步操作完成后的on_done来处理
self.ctx.drop_on_done.store(true, Ordering::Release);
unsafe {
let ctx = self.inner.as_mut();
log::debug!("start write back:{}", self.inner.as_ref());
ctx.continute();
if self.is_write_back() && self.response_ok() {
let exp = self.callback.exp_sec();
if let Some(new) = parser.build_writeback_request(&mut self, exp) {
self.with_request(new);
}
// 还会有异步请求,内存释放交给异步操作完成后的on_done来处理
self.ctx.drop_on_done.store(true, Ordering::Release);
log::debug!("start write back:{}", &*self);
let ctx = self.ptr;
// 必须要提前drop,否则可能会因为continute在drop(self)之前完成,导致在on_done中释放context,
// 此时,此时内存被重置,导致drop_one_done为false,在drop(self)时,再次释放context
drop(self);
unsafe { (&mut *ctx).continute() };
}
}
}
Expand All @@ -345,19 +347,16 @@ impl From<CallbackContext> for CallbackContextPtr {
#[inline]
fn from(ctx: CallbackContext) -> Self {
let ptr = Box::leak(Box::new(ctx));
let inner = unsafe { NonNull::new_unchecked(ptr) };
Self { inner }
Self { ptr }
}
}

impl Drop for CallbackContextPtr {
#[inline]
fn drop(&mut self) {
// 如果ignore为true,说明当前内存手工释放
unsafe {
if !self.inner.as_ref().ctx.drop_on_done() {
self.manual_drop();
}
if !self.ctx.drop_on_done() {
self.manual_drop();
}
}
}
Expand All @@ -366,19 +365,15 @@ impl Deref for CallbackContextPtr {
type Target = CallbackContext;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe {
assert!(!self.inner.as_ref().ctx.drop_on_done());
self.inner.as_ref()
}
//assert!(!self.inner.as_ref().ctx.drop_on_done());
unsafe { &*self.ptr }
}
}
impl DerefMut for CallbackContextPtr {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe {
assert!(!self.inner.as_ref().ctx.drop_on_done());
self.inner.as_mut()
}
//assert!(!self.inner.as_ref().ctx.drop_on_done());
unsafe { &mut *self.ptr }
}
}
unsafe impl Send for CallbackContextPtr {}
Expand Down Expand Up @@ -418,7 +413,7 @@ impl crate::Commander for CallbackContextPtr {
#[inline]
fn response(&self) -> &Command {
assert!(self.inited());
unsafe { self.inner.as_ref().response() }
unsafe { self.unchecked_response() }
}
#[inline]
fn response_mut(&mut self) -> &mut Command {
Expand Down
18 changes: 9 additions & 9 deletions rt/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ impl<F: Future<Output = Result<()>> + Unpin + ReEnter + Debug> Entry<F> {
self.last_rx = now;
}
let ret = Pin::new(&mut self.inner).poll(cx);
if now.elapsed() >= Duration::from_millis(1) {
log::info!(
"tx:{} rx:{} => {:?} elapsed => {:?}",
tx,
rx,
self.inner,
now.elapsed()
);
}
//if now.elapsed() >= Duration::from_millis(1) {
// log::info!(
// "tx:{} rx:{} => {:?} elapsed => {:?}",
// tx,
// rx,
// self.inner,
// now.elapsed()
// );
//}
let (tx_post, rx_post) = (self.inner.num_tx(), self.inner.num_rx());
if tx_post > rx_post {
self.last = Instant::now();
Expand Down
4 changes: 1 addition & 3 deletions stream/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ where

if ctx.inited() {
parser.write_response(&mut ctx, client)?;
if ctx.is_write_back() && ctx.response_ok() && ctx.complete() {
ctx.async_start_write_back(parser);
}
ctx.async_start_write_back(parser);
} else {
let req = ctx.request();
if !req.noforward() {
Expand Down

0 comments on commit 53e1740

Please sign in to comment.