Skip to content

Commit

Permalink
feat:协程替代doElectionTicker和doHeartBeatTicker线程 (#29)
Browse files Browse the repository at this point in the history

引入协程之后发现存在超时,测试分析

 bug fix:fiber

引入协程之后发现会存在leader在某几次定时时间为24ms的情况下睡眠了约300ms的时间,经过排查发现是doElectionTicker函数占用了太多时间导致定时不及时


---------

Co-authored-by: Ornamrr <[email protected]>
Co-authored-by: siwuxie <[email protected]>
  • Loading branch information
3 people authored Feb 22, 2024
1 parent af9e038 commit ade4d7c
Show file tree
Hide file tree
Showing 48 changed files with 3,254 additions and 269 deletions.
14 changes: 5 additions & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)
# 设置项目编译头文件搜索路径 -I
# 目前可能存在路径污染的问题,需要进一步解决
include_directories(${PROJECT_SOURCE_DIR}/src/common/include)
include_directories(${PROJECT_SOURCE_DIR}/src/fiber/include)
include_directories(${PROJECT_SOURCE_DIR}/src/rpc/include)
include_directories(${PROJECT_SOURCE_DIR}/example)
include_directories(${PROJECT_SOURCE_DIR}/src/raftCore/include)
Expand All @@ -35,21 +36,16 @@ add_subdirectory(src)
# example包含了使用的示例代码
add_subdirectory(example)


add_library(skip_list_on_raft STATIC ${src_rpc} ${rpc_example} ${raftsource} ${src_raftCore} ${src_raftRpcPro})

target_link_libraries(skip_list_on_raft muduo_net muduo_base pthread )


add_library(skip_list_on_raft STATIC ${src_rpc} ${src_fiber} ${rpc_example} ${raftsource} ${src_raftCore} ${src_raftRpcPro})
target_link_libraries(skip_list_on_raft muduo_net muduo_base pthread dl)
# 添加格式化目标 start
# from : https://blog.csdn.net/guotianqing/article/details/121661067

add_custom_target(format
COMMAND bash ${PROJECT_SOURCE_DIR}/format.sh
COMMAND echo "format done!"
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
COMMAND echo "format done!"
)


# 添加格式化目标 end

# 添加格式化目标 end
1 change: 1 addition & 0 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
add_subdirectory(fiberExample)

add_subdirectory(rpcExample)

Expand Down
23 changes: 23 additions & 0 deletions example/fiberExample/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
set(
LIB_LIB
fiber_lib
pthread
dl
)

add_executable(test_server server.cpp)
target_link_libraries(test_server ${LIB_LIB})
#add_dependencies(test_server monsoon)

add_executable(test_scheduler test_scheduler.cpp)
target_link_libraries(test_scheduler ${LIB_LIB})
#add_dependencies(test_scheduler monsoon)

add_executable(test_iomanager test_iomanager.cpp)
target_link_libraries(test_iomanager ${LIB_LIB})
#add_dependencies(test_iomanager monsoon)

add_executable(test_hook test_hook.cpp)
target_link_libraries(test_hook ${LIB_LIB})
#add_dependencies(test_hook monsoon)

87 changes: 87 additions & 0 deletions example/fiberExample/server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#include <arpa/inet.h>
#include <fcntl.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include <stack>
#include "monsoon.h"

static int listen_sock = -1;

void test_accept();

// task
void watch_io_read() { monsoon::IOManager::GetThis()->addEvent(listen_sock, monsoon::READ, test_accept); }

void test_accept() {
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
socklen_t len = sizeof(addr);
int fd = accept(listen_sock, (struct sockaddr *)&addr, &len);
if (fd < 0) {
std::cout << "fd = " << fd << ",accept error" << std::endl;
} else {
std::cout << "fd = " << fd << ",accept success" << std::endl;
fcntl(fd, F_SETFL, O_NONBLOCK);
monsoon::IOManager::GetThis()->addEvent(fd, monsoon::READ, [fd]() {
char buffer[1024];
memset(buffer, 0, sizeof(buffer));
while (true) {
int ret = recv(fd, buffer, sizeof(buffer), 0);
if (ret > 0) {
std::cout << "client say: " << buffer << std::endl;
ret = send(fd, buffer, ret, 0);
}
if (ret <= 0) {
if (errno == EAGAIN) continue;
close(fd);
break;
}
}
});
}
monsoon::IOManager::GetThis()->scheduler(watch_io_read);
}

void test_iomanager() {
int port = 8080;
struct sockaddr_in svr_addr;
// socklen_t cli_len = sizeof(cli_addr);
listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sock < 0) {
std::cout << "creating listen socket error" << std::endl;
return;
}

int opt = 1;
setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

memset((char *)&svr_addr, 0, sizeof(svr_addr));
svr_addr.sin_family = AF_INET;
svr_addr.sin_port = htons(port);
svr_addr.sin_addr.s_addr = INADDR_ANY;

if (bind(listen_sock, (struct sockaddr *)&svr_addr, sizeof(svr_addr)) < 0) {
std::cout << "bind error" << std::endl;
return;
}

if (listen(listen_sock, 1024) < 0) {
std::cout << "listen error" << std::endl;
return;
} else {
std::cout << "listen success on port: " << port << std::endl;
}

fcntl(listen_sock, F_SETFL, O_NONBLOCK);

monsoon::IOManager iomanager;
iomanager.addEvent(listen_sock, monsoon::READ, test_accept);
}

int main(int argc, char *argv[]) {
test_iomanager();
return 0;
}
77 changes: 77 additions & 0 deletions example/fiberExample/test_hook.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include "monsoon.h"

const std::string LOG_HEAD = "[TASK] ";

void test_sleep() {
std::cout << LOG_HEAD << "tid = " << monsoon::GetThreadId() << ",test_fiber_sleep begin" << std::endl;
monsoon::IOManager iom(1, true);

iom.scheduler([] {
while (1) {
sleep(6);
std::cout << "task 1 sleep for 6s" << std::endl;
}
});

iom.scheduler([] {
while (1) {
sleep(2);
std::cout << "task2 sleep for 2s" << std::endl;
}
});

std::cout << LOG_HEAD << "tid = " << monsoon::GetThreadId() << ",test_fiber_sleep finish" << std::endl;
}

void test_sock() {
int sock = socket(AF_INET, SOCK_STREAM, 0);

sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(80);
inet_pton(AF_INET, "36.152.44.96", &addr.sin_addr.s_addr);

std::cout << "begin connect" << std::endl;
int rt = connect(sock, (const sockaddr *)&addr, sizeof(addr));
std::cout << "connect rt=" << rt << " errno=" << errno << std::endl;

if (rt) {
return;
}

const char data[] = "GET / HTTP/1.0\r\n\r\n";
rt = send(sock, data, sizeof(data), 0);
std::cout << "send rt=" << rt << " errno=" << errno << std::endl;

if (rt <= 0) {
return;
}

std::string buff;
buff.resize(4096);

rt = recv(sock, &buff[0], buff.size(), 0);
std::cout << "recv rt=" << rt << " errno=" << errno << std::endl;

if (rt <= 0) {
return;
}

buff.resize(rt);
std::cout << "--------------------------------" << std::endl;
std::cout << buff << std::endl;
std::cout << "--------------------------------" << std::endl;
}

int main() {
// monsoon::IOManager iom;
// iom.scheduler(test_sock);

test_sleep();
}
89 changes: 89 additions & 0 deletions example/fiberExample/test_iomanager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include "monsoon.h"

int sockfd;
void watch_io_read();

// 写事件回调,只执行一次,用于判断非阻塞套接字connect成功
void do_io_write() {
std::cout << "write callback" << std::endl;
int so_err;
socklen_t len = size_t(so_err);
getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &so_err, &len);
if (so_err) {
std::cout << "connect fail" << std::endl;
return;
}
std::cout << "connect success" << std::endl;
}

// 读事件回调,每次读取之后如果套接字未关闭,需要重新添加
void do_io_read() {
std::cout << "read callback" << std::endl;
char buf[1024] = {0};
int readlen = 0;
readlen = read(sockfd, buf, sizeof(buf));
if (readlen > 0) {
buf[readlen] = '\0';
std::cout << "read " << readlen << " bytes, read: " << buf << std::endl;
} else if (readlen == 0) {
std::cout << "peer closed";
close(sockfd);
return;
} else {
std::cout << "err, errno=" << errno << ", errstr=" << strerror(errno) << std::endl;
close(sockfd);
return;
}
// read之后重新添加读事件回调,这里不能直接调用addEvent,因为在当前位置fd的读事件上下文还是有效的,直接调用addEvent相当于重复添加相同事件
monsoon::IOManager::GetThis()->scheduler(watch_io_read);
}

void watch_io_read() {
std::cout << "watch_io_read" << std::endl;
monsoon::IOManager::GetThis()->addEvent(sockfd, monsoon::READ, do_io_read);
}

void test_io() {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
monsoon::CondPanic(sockfd > 0, "scoket should >0");
fcntl(sockfd, F_SETFL, O_NONBLOCK);

sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(80);
inet_pton(AF_INET, "36.152.44.96", &servaddr.sin_addr.s_addr);

int rt = connect(sockfd, (const sockaddr *)&servaddr, sizeof(servaddr));
if (rt != 0) {
if (errno == EINPROGRESS) {
std::cout << "EINPROGRESS" << std::endl;
// 注册写事件回调,只用于判断connect是否成功
// 非阻塞的TCP套接字connect一般无法立即建立连接,要通过套接字可写来判断connect是否已经成功
monsoon::IOManager::GetThis()->addEvent(sockfd, monsoon::WRITE, do_io_write);
// 注册读事件回调,注意事件是一次性的
monsoon::IOManager::GetThis()->addEvent(sockfd, monsoon::READ, do_io_read);
} else {
std::cout << "connect error, errno:" << errno << ", errstr:" << strerror(errno) << std::endl;
}
} else {
std::cout << "else, errno:" << errno << ", errstr:" << strerror(errno) << std::endl;
}
}

void test_iomanager() {
monsoon::IOManager iom;
// monsoon::IOManager iom(10); // 演示多线程下IO协程在不同线程之间切换
iom.scheduler(test_io);
}

int main(int argc, char *argv[]) {
test_iomanager();

return 0;
}
Loading

0 comments on commit ade4d7c

Please sign in to comment.