diff --git a/config.h.in b/config.h.in index 4360726e..58f096d7 100644 --- a/config.h.in +++ b/config.h.in @@ -100,6 +100,9 @@ /* The default port for Spindle */ #undef SPINDLE_PORT +/* The default number of roots in a cobo network */ +#undef SPINDLE_ROOTS + /* Define to 1 if you have the ANSI C header files. */ #undef STDC_HEADERS diff --git a/configure b/configure index 12cf007f..720fc745 100755 --- a/configure +++ b/configure @@ -819,6 +819,7 @@ enable_fast_install with_gnu_ld with_sysroot enable_libtool_lock +with_default_roots with_default_port with_default_num_ports with_localstorage @@ -1526,6 +1527,8 @@ Optional Packages: --with-gnu-ld assume the C compiler uses GNU ld [default=no] --with-sysroot=DIR Search for dependent libraries within DIR (or the compiler's sysroot if not specified). + --with-default-roots=NUM + The number of roots in a cobo network --with-default-port=NUM TCP/IP Port for Spindle server communication --with-default-numports=NUM Number of TCP/IP ports to scan for Spindle server @@ -16192,11 +16195,20 @@ fi #Include common ops #Configure operations that are common between the Spindle and Spindle-client configurations #Network port and local storage location +DEFAULT_ROOTS=1 DEFAULT_PORT=21940 DEFAULT_LOC='$TMPDIR' DEFAULT_NUM_COBO_PORTS=25 +# Check whether --with-default-roots was given. +if test "${with_default_roots+set}" = set; then : + withval=$with_default_roots; SPINDLE_ROOTS=${withval} +else + SPINDLE_ROOTS=$DEFAULT_ROOTS +fi + + # Check whether --with-default-port was given. if test "${with_default_port+set}" = set; then : withval=$with_default_port; SPINDLE_PORT=${withval} @@ -16221,6 +16233,11 @@ else fi +cat >>confdefs.h <<_ACEOF +#define SPINDLE_ROOTS $SPINDLE_ROOTS +_ACEOF + + cat >>confdefs.h <<_ACEOF #define SPINDLE_PORT $SPINDLE_PORT _ACEOF diff --git a/configure.common.ac b/configure.common.ac index 501a2de7..983ce67a 100644 --- a/configure.common.ac +++ b/configure.common.ac @@ -1,9 +1,14 @@ #Configure operations that are common between the Spindle and Spindle-client configurations #Network port and local storage location +DEFAULT_ROOTS=1 DEFAULT_PORT=21940 DEFAULT_LOC='$TMPDIR' DEFAULT_NUM_COBO_PORTS=25 +AC_ARG_WITH(default-roots, + [AS_HELP_STRING([--with-default-roots=NUM],[The number of roots in a cobo network])], + [SPINDLE_ROOTS=${withval}], + [SPINDLE_ROOTS=$DEFAULT_ROOTS]) AC_ARG_WITH(default-port, [AS_HELP_STRING([--with-default-port=NUM],[TCP/IP Port for Spindle server communication])], [SPINDLE_PORT=${withval}], @@ -16,6 +21,7 @@ AC_ARG_WITH(localstorage, [AS_HELP_STRING([--with-localstorage=DIR],[Directory on back-ends for storing relocated files])], [SPINDLE_LOC=${withval}], [SPINDLE_LOC=$DEFAULT_LOC]) +AC_DEFINE_UNQUOTED([SPINDLE_ROOTS],[$SPINDLE_ROOTS],[The default number of roots in a cobo network]) AC_DEFINE_UNQUOTED([SPINDLE_PORT],[$SPINDLE_PORT],[The default port for Spindle]) AC_DEFINE_UNQUOTED([NUM_COBO_PORTS],[$NUM_COBO_PORTS],[Number of ports for COBO to search for an open port]) AC_DEFINE_UNQUOTED([SPINDLE_MAX_PORT],[$(($SPINDLE_PORT + $NUM_COBO_PORTS - 1))],[The maximum port value]) diff --git a/src/client/config.h.in b/src/client/config.h.in index 1b497e6b..de8674b8 100644 --- a/src/client/config.h.in +++ b/src/client/config.h.in @@ -94,6 +94,9 @@ /* The default port for Spindle */ #undef SPINDLE_PORT +/* The default number of roots in a cobo network */ +#undef SPINDLE_ROOTS + /* Define to 1 if you have the ANSI C header files. */ #undef STDC_HEADERS diff --git a/src/client/configure b/src/client/configure index e648e9ad..12609032 100755 --- a/src/client/configure +++ b/src/client/configure @@ -784,6 +784,7 @@ enable_fast_install with_gnu_ld with_sysroot enable_libtool_lock +with_default_roots with_default_port with_default_num_ports with_localstorage @@ -1469,6 +1470,8 @@ Optional Packages: --with-gnu-ld assume the C compiler uses GNU ld [default=no] --with-sysroot=DIR Search for dependent libraries within DIR (or the compiler's sysroot if not specified). + --with-default-roots=NUM + The number of roots in a cobo network --with-default-port=NUM TCP/IP Port for Spindle server communication --with-default-numports=NUM Number of TCP/IP ports to scan for Spindle server @@ -12401,11 +12404,20 @@ CC="$lt_save_CC" #Configure operations that are common between the Spindle and Spindle-client configurations #Network port and local storage location +DEFAULT_ROOTS=1 DEFAULT_PORT=21940 DEFAULT_LOC='$TMPDIR' DEFAULT_NUM_COBO_PORTS=25 +# Check whether --with-default-roots was given. +if test "${with_default_roots+set}" = set; then : + withval=$with_default_roots; SPINDLE_ROOTS=${withval} +else + SPINDLE_ROOTS=$DEFAULT_ROOTS +fi + + # Check whether --with-default-port was given. if test "${with_default_port+set}" = set; then : withval=$with_default_port; SPINDLE_PORT=${withval} @@ -12430,6 +12442,11 @@ else fi +cat >>confdefs.h <<_ACEOF +#define SPINDLE_ROOTS $SPINDLE_ROOTS +_ACEOF + + cat >>confdefs.h <<_ACEOF #define SPINDLE_PORT $SPINDLE_PORT _ACEOF diff --git a/src/cobo/cobo.c b/src/cobo/cobo.c index 730ac563..fca39c51 100644 --- a/src/cobo/cobo.c +++ b/src/cobo/cobo.c @@ -55,6 +55,8 @@ Place, Suite 330, Boston, MA 02111-1307 USA #define COBO_CONNECT_TIMELIMIT (600) /* seconds -- wait this long before giving up for good */ #endif +#define ENABLE_HANDSHAKE + #if defined(_IA64_) #undef htons #undef ntohs @@ -109,6 +111,15 @@ static int cobo_num_child = 0; /* number of children */ static int* cobo_child_incl = NULL; /* number of children each child is responsible for (includes itself) */ static int cobo_num_child_incl = 0; /* total number of children this node is responsible for */ +/* forest data structures */ +static int cobo_is_forest_opened = 0; +static int cobo_num_forest_childs = 0; /* number of clockwise direction peers (children) */ +static int* cobo_forest_childs = NULL; /* ranks of clockwise direction peers**/ +static int* cobo_forest_childs_fd = NULL; /* sockets to clockwise direction peers */ +//static int cobo_num_forest_parents = 0; /* (= cobo_num_forest_childs) */ +static int* cobo_forest_parents = NULL; /* ranks of counterclockwise direction peers (parents) */ +static int* cobo_forest_parents_fd = NULL; /* sockets to counterclockwise direction peers */ + static int cobo_root_fd = -1; static handshake_protocol_t cobo_handshake; @@ -141,9 +152,11 @@ static double cobo_getsecs(struct timeval* tv2, struct timeval* tv1) /* Fills in timeval via gettimeofday */ static void cobo_gettimeofday(struct timeval* tv) { +#if 0 if (gettimeofday(tv, NULL) < 0) { err_printf("Getting time (gettimeofday() %m errno=%d)\n", errno); } +#endif } /* Reads environment variable, bails if not set */ @@ -484,7 +497,7 @@ static int cobo_connect_hostname(char* hostname, int rank) int port = cobo_ports[i]; /* attempt to connect to hostname on this port */ - debug_printf3("Trying rank %d port %d on %s\n", rank, port, hostname); + debug_printf3("Trying rank %d port %d on %s\n", rank, port, hostname); /* s = cobo_connect(*(struct in_addr *) (*he->h_addr_list), htons(port)); */ s = cobo_connect(saddr, htons(port), connect_timeout); if (s != -1) { @@ -497,9 +510,19 @@ static int cobo_connect_hostname(char* hostname, int rank) case HSHAKE_SUCCESS: break; case HSHAKE_INTERNAL_ERROR: +#if 1 /* Kento modified*/ + /*Max listen process is 1, Between accept() and close(listen_sockfd), + another client process can connect to this port. Becase the server will call close(liste_sockfd) later, + this handshake faile with HSHAKE_INTERNAL_ERROR, so we try another port. + */ + debug_printf3("Internal error doing handshake: %s", spindle_handshake_last_error_str()); + close(s); + continue; +#else err_printf("Internal error doing handshake: %s", spindle_handshake_last_error_str()); exit(-1); break; +#endif case HSHAKE_DROP_CONNECTION: debug_printf3("Handshake said to drop connection\n"); close(s); @@ -510,7 +533,7 @@ static int cobo_connect_hostname(char* hostname, int rank) default: assert(0 && "Unknown return value from handshake_server\n"); } - + /* write cobo service id */ if (!test_failed && cobo_write_fd_w_suppress(s, &cobo_serviceid, sizeof(cobo_serviceid), 1) < 0) { debug_printf3("Writing service id to %s on port %d\n", @@ -745,17 +768,70 @@ static int cobo_compute_children_root_C1() } #endif -/* open socket tree across tasks */ -static int cobo_open_tree() +static int cobo_create_socket() { - /* create a socket to accept connection from parent IPPROTO_TCP */ - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd < 0) { - err_printf("Creating parent socket (socket() %m errno=%d)\n", - errno); + /* create a socket to accept connection from parent IPPROTO_TCP */ + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { + err_printf("Creating parent socket (socket() %m errno=%d)\n", + errno); + exit(1); + } + return sockfd; +} + +static void cobo_bind(int sockfd) +{ + /* TODO: could recycle over port numbers, trying to bind to one for some time */ + /* try to bind the socket to one the ports in our allowed range */ + int i = 0; + int port_is_bound = 0; + while (i < cobo_num_ports && !port_is_bound) { + /* pick a port */ + int port = cobo_ports[i]; + i++; + + /* set up an address using our selected port */ + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = htonl(INADDR_ANY); + sin.sin_port = htons(port); + + /* attempt to bind a socket on this port */ + if (bind(sockfd, (struct sockaddr *) &sin, sizeof(sin)) < 0) { + debug_printf3("Binding parent socket (bind() %m errno=%d) port=%d\n", + errno, port); + continue; + } + + /* bound and listening on our port */ + debug_printf3("Opened socket on port %d\n", port); + port_is_bound = 1; + } + + /* failed to bind to a port, this is fatal */ + if (!port_is_bound) { + /* TODO: would like to send an abort back to server */ + err_printf("Failed to open socket on any port\n"); exit(1); } +} + +static void cobo_listen(int sockfd) +{ + /* set the socket to listen for connections */ + if (listen(sockfd, 1) < 0) { + cobo_dbg_printf("Setting parent socket to listen (listen() %m errno=%d)", errno); + exit(1); + } + return; +} + +/* bind and listen socket */ +static void cobo_bind_and_listen(int sockfd) +{ /* TODO: could recycle over port numbers, trying to bind to one for some time */ /* try to bind the socket to one the ports in our allowed range */ int i = 0; @@ -797,29 +873,32 @@ static int cobo_open_tree() err_printf("Failed to open socket on any port\n"); exit(1); } +} +static int cobo_accept_and_handshake(int sockfd) +{ + int accepted_sockfd; /* accept a connection from parent and receive socket table */ int reply_timeout = cobo_connect_timeout * 100; int have_parent = 0; while (!have_parent) { struct sockaddr parent_addr; socklen_t parent_len = sizeof(parent_addr); - cobo_parent_fd = accept(sockfd, (struct sockaddr *) &parent_addr, &parent_len); - + accepted_sockfd = accept(sockfd, (struct sockaddr *) &parent_addr, &parent_len); _cobo_opt_socket(sockfd); /* handshake/authenticate our connection to make sure it one of our processes */ - int result = spindle_handshake_server(cobo_parent_fd, &cobo_handshake, cobo_sessionid); + int result = spindle_handshake_server(accepted_sockfd, &cobo_handshake, cobo_sessionid); switch (result) { case HSHAKE_SUCCESS: break; case HSHAKE_INTERNAL_ERROR: - err_printf("Internal error doing handshake: %s", spindle_handshake_last_error_str()); + err_printf("Internal error doing handshake: %s", spindle_handshake_last_error_str()); exit(-1); break; case HSHAKE_DROP_CONNECTION: debug_printf3("Handshake said to drop connection\n"); - close(cobo_parent_fd); + close(accepted_sockfd); continue; case HSHAKE_ABORT: handle_security_error(spindle_handshake_last_error_str()); @@ -830,55 +909,79 @@ static int cobo_open_tree() /* read the service id */ unsigned int received_serviceid = 0; - if (cobo_read_fd_w_timeout(cobo_parent_fd, &received_serviceid, sizeof(received_serviceid), reply_timeout) < 0) { + if (cobo_read_fd_w_timeout(accepted_sockfd, &received_serviceid, sizeof(received_serviceid), reply_timeout) < 0) { debug_printf3("Receiving service id from new connection failed\n"); - close(cobo_parent_fd); + close(accepted_sockfd); continue; } /* read the session id */ uint64_t received_sessionid = 0; - if (cobo_read_fd_w_timeout(cobo_parent_fd, &received_sessionid, sizeof(received_sessionid), reply_timeout) < 0) { + if (cobo_read_fd_w_timeout(accepted_sockfd, &received_sessionid, sizeof(received_sessionid), reply_timeout) < 0) { debug_printf3("Receiving session id from new connection failed\n"); - close(cobo_parent_fd); + close(accepted_sockfd); continue; } /* check that we got the expected sesrive and session ids */ /* TODO: reply with some sort of error message if no match? */ if (received_serviceid != cobo_serviceid || received_sessionid != cobo_sessionid) { - close(cobo_parent_fd); + close(accepted_sockfd); continue; } /* write our service id back as a reply */ - if (cobo_write_fd_w_suppress(cobo_parent_fd, &cobo_serviceid, sizeof(cobo_serviceid), 1) < 0) { + if (cobo_write_fd_w_suppress(accepted_sockfd, &cobo_serviceid, sizeof(cobo_serviceid), 1) < 0) { debug_printf3("Writing service id to new connection failed\n"); - close(cobo_parent_fd); + close(accepted_sockfd); continue; } /* write our accept id back as a reply */ - if (cobo_write_fd_w_suppress(cobo_parent_fd, &cobo_acceptid, sizeof(cobo_acceptid), 1) < 0) { + if (cobo_write_fd_w_suppress(accepted_sockfd, &cobo_acceptid, sizeof(cobo_acceptid), 1) < 0) { debug_printf3("Writing accept id to new connection failed\n"); - close(cobo_parent_fd); + close(accepted_sockfd); continue; } /* our parent may have dropped us if he was too impatient waiting for our reply, * read his ack to know that he completed the connection */ unsigned int ack = 0; - if (cobo_read_fd_w_timeout(cobo_parent_fd, &ack, sizeof(ack), reply_timeout) < 0) { + if (cobo_read_fd_w_timeout(accepted_sockfd, &ack, sizeof(ack), reply_timeout) < 0) { debug_printf3("Receiving ack to finalize connection\n"); - close(cobo_parent_fd); + close(accepted_sockfd); continue; } /* if we get here, we've got a good connection to our parent */ have_parent = 1; } + return accepted_sockfd; +} - /* we've got the connection to our parent, so close the listening socket */ +static int cobo_connect_rank(int rank) +{ + char* hostname = cobo_expand_hostname(rank); + int sockfd = cobo_connect_hostname(hostname, rank); + if (sockfd == -1) { + err_printf("Failed to connect to child (rank %d) on %s failed\n", + rank, hostname); + exit(1); + } + free(hostname); + return sockfd; +} + + +/* open socket tree across tasks */ +static int cobo_open_tree() +{ + int sockfd; + int i = 0; + + sockfd = cobo_create_socket(); + cobo_bind_and_listen(sockfd); + cobo_parent_fd = cobo_accept_and_handshake(sockfd); close(sockfd); cobo_gettimeofday(&tree_start); @@ -923,15 +1026,12 @@ static int cobo_open_tree() /* given our rank and the number of ranks, compute the ranks of our children */ cobo_compute_children(); /* cobo_compute_children_root_C1(); */ - /* for each child, open socket connection and forward hostname table */ for(i=0; i < cobo_num_child; i++) { /* get rank and hostname for this child */ int c = cobo_child[i]; char* child_hostname = cobo_expand_hostname(c); - debug_printf3("%d: on COBO%02d: connect to child #%02d (%s)\n",i,cobo_me,c,child_hostname); - /* connect to child */ cobo_child_fd[i] = cobo_connect_hostname(child_hostname, c); if (cobo_child_fd[i] == -1) { @@ -952,7 +1052,6 @@ static int cobo_open_tree() /* free the child hostname string */ free(child_hostname); } - return COBO_SUCCESS; } @@ -980,6 +1079,82 @@ static int cobo_close_tree() return COBO_SUCCESS; } +/* open socket forest */ +int cobo_open_forest() +{ + int i; + /* compute ranks of peers */ + { + int rank_offset = 1; + while (rank_offset <= cobo_nprocs - 1) { + cobo_num_forest_childs++; + rank_offset = rank_offset << 1; + } + i = 0; + rank_offset = 1; + cobo_forest_childs = (int*)cobo_malloc(sizeof(int) * cobo_num_forest_childs, "clockwise peer buffer"); + cobo_forest_parents = (int*)cobo_malloc(sizeof(int) * cobo_num_forest_childs, "counterclockwise peer buffer"); + while (rank_offset <= cobo_nprocs - 1) { + cobo_forest_childs[i] = cobo_me + rank_offset; + cobo_forest_parents[i] = (cobo_me + cobo_nprocs - rank_offset) % cobo_nprocs; + rank_offset = rank_offset << 1; + i++; + } + } + + /* Create Binomial Forest overlay network */ + int sockfd; + int num_successive_listen_rank_num = 1; + cobo_forest_childs_fd = (int*)cobo_malloc(sizeof(int) * cobo_num_forest_childs, "clockwise peer fd buffer"); + cobo_forest_parents_fd = (int*)cobo_malloc(sizeof(int) * cobo_num_forest_childs, "counterclockwise peer fd buffer"); + + sockfd = cobo_create_socket(); + cobo_bind(sockfd); + for (i = 0; i < cobo_num_forest_childs; i++) { + int connection_group_id = cobo_me / num_successive_listen_rank_num; + int conn_rank = cobo_forest_parents[i]; + /* TODO: Reuse alread binded socket in every iterations + => For now: Create and close sockets every iterations + */ + /* TODO: Reuse sockets opened in cobo_open_tree() + => For now: Sockets (connections) redundancy in tree and forest + */ + + if (connection_group_id % 2 == 0) { + /* Active connection => Passive connection */ + cobo_forest_childs_fd[i] = cobo_connect_rank(conn_rank); + cobo_listen(sockfd); + cobo_forest_parents_fd[i] = cobo_accept_and_handshake(sockfd); + } else { + /* Passive connection -> Active connection */ + cobo_listen(sockfd); + cobo_forest_parents_fd[i] = cobo_accept_and_handshake(sockfd); + cobo_forest_childs_fd[i] = cobo_connect_rank(conn_rank); + } + num_successive_listen_rank_num = num_successive_listen_rank_num << 1; + } + close(sockfd); + cobo_is_forest_opened = 1; + return COBO_SUCCESS; +} + + +/* open socket forest */ +int cobo_close_forest() +{ + int i; + for (i = 0; i < cobo_num_forest_childs; i++) { + close(cobo_forest_childs_fd[i]); + close(cobo_forest_parents_fd[i]); + } + cobo_is_forest_opened = 0; + cobo_free(cobo_forest_childs); + cobo_free(cobo_forest_parents); + cobo_free(cobo_forest_childs_fd); + cobo_free(cobo_forest_parents_fd); + return COBO_SUCCESS; +} + /* * ============================= * Functions to bcast/gather/scatter with root as rank 0 using the TCP/socket tree. @@ -1148,12 +1323,7 @@ static int cobo_scatter_tree(void* sendbuf, int sendcount, void* recvbuf) return rc; } -int cobo_get_child_socket(int num, int *fd) -{ - assert(num < cobo_num_child); - *fd = cobo_child_fd[num]; - return COBO_SUCCESS; -} + /* * ========================================================================== @@ -1163,12 +1333,152 @@ int cobo_get_child_socket(int num, int *fd) * ========================================================================== */ -/* NEW */ -int cobo_get_num_childs(int* num_childs) { - *num_childs=cobo_num_child; +int cobo_get_num_tree(int *num_trees) +{ + *num_trees = cobo_nprocs; + return COBO_SUCCESS; +} + +int cobo_get_forest_child_socket(int root, int num, int *fd) +{ + int num_forest_childs; + /* if (!cobo_is_forest_opened) { */ + /* if (root == COBO_PRIMARY_TREE) { */ + /* return cobo_get_child_socket(num, fd); */ + /* } else { */ + /* err_printf("Trying to use forest before cobo_open_forest"); */ + /* exit(1); */ + /* } */ + /* } */ + if (root == COBO_FOREST) { + *fd = cobo_forest_childs_fd[num]; + return COBO_SUCCESS; + } + + cobo_get_num_forest_childs(root, &num_forest_childs); + if (num >= num_forest_childs) { + cobo_dbg_printf("Requested child %d, but # of childs is %d", num, num_forest_childs); + exit(1); + } + *fd = cobo_forest_childs_fd[num]; + return COBO_SUCCESS; +} + +int cobo_get_num_forest_childs(int root, int* num_forest_childs) +{ + int num_childs = 0; + int logical_rank = 0; /*logical_rank of the root is 0*/ + int tmp_cobo_nprocs; + /* if (!cobo_is_forest_opened) { */ + /* if (root == COBO_PRIMARY_TREE) { */ + /* return cobo_get_num_childs(num_forest_childs); */ + /* } else { */ + /* err_printf("Trying to use forest before cobo_open_forest"); */ + /* exit(1); */ + /* } */ + /* } */ + if (root == COBO_FOREST) { + *num_forest_childs = cobo_num_forest_childs; + return COBO_SUCCESS; + } + + /*TODO: memoriaze already-computed num in an array, and simply return it*/ + /*Making the root's logical_rank 0 in this tree (root)*/ + logical_rank = (cobo_me + cobo_nprocs - root) % cobo_nprocs; + tmp_cobo_nprocs = cobo_nprocs >> 1; + // cobo_dbg_printf("logical_rank: %d, cobo_nprocs: %d", logical_rank, tmp_cobo_nprocs); + while(tmp_cobo_nprocs) { + if (logical_rank & 0x1) break; + // cobo_dbg_printf(" logical_rank: %d, cobo_nprocs: %d", logical_rank, tmp_cobo_nprocs); + num_childs++; + logical_rank = logical_rank >> 1; + tmp_cobo_nprocs = tmp_cobo_nprocs >> 1; + } + *num_forest_childs = num_childs; return COBO_SUCCESS; } +int cobo_get_forest_parent_socket(int root, int *fd) +{ + int num_childs; + int forest_parents_fd_index; + /* if (!cobo_is_forest_opened) { */ + /* if (root == COBO_PRIMARY_TREE) { */ + /* return cobo_get_parent_socket(fd); */ + /* } else { */ + /* err_printf("Trying to use forest before cobo_open_forest"); */ + /* exit(1); */ + /* } */ + /* } */ + if (cobo_me == root) { + if (cobo_me != 0) { + cobo_dbg_printf("root (tree_id=%d) does not have parent", root); + exit(1); + } + *fd = cobo_parent_fd; + } else { + cobo_get_num_forest_childs(root, &num_childs); + forest_parents_fd_index = num_childs; + *fd = cobo_forest_parents_fd[forest_parents_fd_index]; + } + return COBO_SUCCESS; +} + +int cobo_get_num_forest_parents(int root, int *num_parents) +{ + /* if (!cobo_is_forest_opened) { */ + /* if (root == COBO_PRIMARY_TREE) { */ + /* *num_parents = 1; */ + /* return COBO_SUCCESS; */ + /* } else { */ + /* err_printf("Trying to use forest before cobo_open_forest"); */ + /* exit(1); */ + /* } */ + /* } */ + if (root == COBO_FOREST) { + *num_parents = cobo_num_forest_childs; + } else { + *num_parents = 1; + } + return COBO_SUCCESS; +} + +int cobo_get_forest_parent_socket_at(int num, int *fd) +{ + /* if (!cobo_is_forest_opened) { */ + /* if (num == COBO_PRIMARY_TREE) { */ + /* *fd = cobo_parent_fd; */ + /* return COBO_SUCCESS; */ + /* } else { */ + /* err_printf("Trying to use forest before cobo_open_forest"); */ + /* exit(1); */ + /* } */ + /* } */ + *fd = cobo_forest_parents_fd[num]; + return COBO_SUCCESS; +} + +int cobo_get_child_socket(int num, int *fd) +{ + /* if (cobo_is_forest_opened) { */ + /* cobo_get_forest_child_socket(0, num, fd); */ + /* } else { */ + /* assert(num < cobo_num_child); */ + /* *fd = cobo_child_fd[num]; */ + /* } */ + cobo_get_forest_child_socket(0, num, fd); + return COBO_SUCCESS; +} + +int cobo_get_num_childs(int* num_childs) { + /* if (cobo_is_forest_opened) { */ + /* cobo_get_num_forest_childs(0, num_childs); */ + /* } else { */ + /* *num_childs=cobo_num_child; */ + /* } */ + cobo_get_num_forest_childs(0, num_childs); + return COBO_SUCCESS; +} /* fills in fd with socket file desriptor to our parent */ @@ -1177,12 +1487,17 @@ int cobo_get_num_childs(int* num_childs) { * and forces sockets */ int cobo_get_parent_socket(int* fd) { - if (cobo_parent_fd != -1) { - *fd = cobo_parent_fd; - return COBO_SUCCESS; - } - - return -1; /* failure RCs? */ + /* if (cobo_is_forest_opened){ */ + /* cobo_get_forest_parent_socket(0, fd); */ + /* } else { */ + /* if (cobo_parent_fd != -1) { */ + /* *fd = cobo_parent_fd; */ + /* return COBO_SUCCESS; */ + /* } */ + /* return -1; /\* failure RCs? *\/ */ + /* } */ + cobo_get_forest_parent_socket(0, fd); + return COBO_SUCCESS; } /* Perform barrier, each task writes an int then waits for an int */ @@ -1321,6 +1636,11 @@ int cobo_alltoall(void* sendbuf, int sendcount, void* recvbuf) return rc; } +/* + * Perform MPI-like Allreduce maximum of a single int from each task + */ +static int cobo_allreduce(){} + /* * Perform MPI-like Allreduce maximum of a single int from each task */ @@ -1394,6 +1714,7 @@ int cobo_allgather_str(char* sendstr, char*** recvstr, char** recvbuf) return COBO_SUCCESS; } + /* provide list of ports and number of ports as input, get number of tasks and my rank as output */ int cobo_open(uint64_t sessionid, int* portlist, int num_ports, int* rank, int* num_ranks) { @@ -1401,6 +1722,7 @@ int cobo_open(uint64_t sessionid, int* portlist, int num_ports, int* rank, int* char *value; struct timeval start, end; + cobo_gettimeofday(&time_open); cobo_gettimeofday(&start); /* we now know this process is a client, although we don't know what our rank is yet */ @@ -1454,6 +1776,9 @@ int cobo_open(uint64_t sessionid, int* portlist, int num_ports, int* rank, int* exit(1); } + /* open the forest */ + cobo_open_forest(); + if (cobo_me == 0) { cobo_gettimeofday(&tree_end); debug_printf3("Exiting cobo_close(), took %f seconds for %d procs\n", cobo_getsecs(&tree_end,&tree_start), cobo_nprocs); @@ -1465,6 +1790,7 @@ int cobo_open(uint64_t sessionid, int* portlist, int num_ports, int* rank, int* cobo_gettimeofday(&end); debug_printf3("Exiting cobo_init(), took %f seconds for %d procs\n", cobo_getsecs(&end,&start), cobo_nprocs); + return COBO_SUCCESS; } @@ -1475,6 +1801,10 @@ int cobo_close() cobo_gettimeofday(&start); debug_printf3("Starting cobo_close()"); + /* shut doen the forest*/ + if (cobo_is_forest_opened) { + cobo_close_forest(); + } /* shut down the tree */ cobo_close_tree(); @@ -1486,6 +1816,9 @@ int cobo_close() debug_printf3("Exiting cobo_close(), took %f seconds for %d procs\n", cobo_getsecs(&end,&start), cobo_nprocs); debug_printf3("Total time from cobo_open() to cobo_close() took %f seconds for %d procs\n", cobo_getsecs(&time_close, &time_open), cobo_nprocs); + /* if (cobo_me == 0) { */ + /* cobo_dbg_printf("Total time: %f seconds (%d procs)", cobo_getsecs(&time_close, &time_open), cobo_nprocs); */ + /* } */ return COBO_SUCCESS; } @@ -1586,7 +1919,7 @@ int cobo_server_close() /* free data structures */ cobo_free(cobo_ports); cobo_free(cobo_hostlist); - + return COBO_SUCCESS; } diff --git a/src/cobo/handshake.c b/src/cobo/handshake.c index 382e9765..21129dc0 100644 --- a/src/cobo/handshake.c +++ b/src/cobo/handshake.c @@ -66,6 +66,7 @@ Place, Suite 330, Boston, MA 02111-1307 USA if (debug_file) { \ fprintf(debug_file, "ERROR: [%s:%u] - %s", BASE_FILE, __LINE__, last_error_message); \ } \ + fprintf(stderr, "ERROR: [%s:%u] - %s", BASE_FILE, __LINE__, last_error_message); \ } while (0) #define security_error_printf(format, ...) \ @@ -239,6 +240,7 @@ static int handshake_wrapper(int sockfd, handshake_protocol_t *hdata, uint64_t s return_result = result; goto done; } + for (;;) { result = handshake_main(sockfd, hdata, session_id, is_server); @@ -786,8 +788,14 @@ static int reliable_read(int fd, void *buf, size_t size) } result = read(fd, ((unsigned char *) buf) + bytes_read, size - bytes_read); if (result <= 0) { - error_printf("Expected error return %d when reading from socket: %s\n", result, +#if 1 /*Kento modified*/ + /*During handshaking, connection can be dropped. So handle this as debug print */ + debug_printf("Expected error return %d when reading from socket: %s\n", result, + strerror(errno)); +#else + error_printf("Expected error return %d when reading from socket: %s\n", result, strerror(errno)); +#endif return HSHAKE_INTERNAL_ERROR; } else diff --git a/src/cobo/ldcs_cobo.h b/src/cobo/ldcs_cobo.h index bbef99a1..2dd01531 100644 --- a/src/cobo/ldcs_cobo.h +++ b/src/cobo/ldcs_cobo.h @@ -39,6 +39,10 @@ extern "C" { #include "handshake.h" #define COBO_SUCCESS (0) +#define COBO_PRIMARY_TREE (0) +#define COBO_FOREST (-1) +#define COBO_ALL_PARENTS (-2) +#define COBO_ALL_CHILDS (-3) #define COBO_NAMESPACE ldcs @@ -46,6 +50,7 @@ extern "C" { #define COMBINE2(a, b) a ## _ ## b #define COMBINE(a, b) COMBINE2(a, b) #define cobo_open COMBINE(COBO_NAMESPACE, cobo_open) +#define cobo_open_forest COMBINE(COBO_NAMESPACE, cobo_open_forest) #define cobo_close COMBINE(COBO_NAMESPACE, cobo_close) #define cobo_get_parent_socket COMBINE(COBO_NAMESPACE, cobo_get_parent_socket) #define cobo_barrier COMBINE(COBO_NAMESPACE, cobo_barrier) @@ -76,6 +81,9 @@ extern "C" { /* provide list of ports and number of ports as input, get number of tasks and my rank as output */ int cobo_open(uint64_t sessionid, int* portlist, int num_ports, int* rank, int *num_ranks); +/* Convet form tree to forest */ +int cobo_open_forest(); + /* shut down the connections between tasks and free data structures */ int cobo_close(); @@ -148,8 +156,13 @@ int cobo_server_get_root_socket(int* fd); extern double __cobo_ts; +int cobo_get_num_tree(int *num_trees); +int cobo_get_forest_child_socket(int tree_id, int num, int *fd); +int cobo_get_num_forest_childs(int tree_id, int* num_childs); +int cobo_get_forest_parent_socket(int tree_id, int *fd); +int cobo_get_forest_parent_socket_at(int num, int *fd); +int cobo_get_num_forest_parents(int tree_id, int *num_parents); int cobo_get_num_childs(int* num_childs); - /* Methods to access child fds */ int cobo_get_child_socket(int num, int *fd); diff --git a/src/fe/config.h.in b/src/fe/config.h.in index 60f65abf..9bfe536f 100644 --- a/src/fe/config.h.in +++ b/src/fe/config.h.in @@ -130,6 +130,9 @@ /* The default port for Spindle */ #undef SPINDLE_PORT +/* The default number of roots in a cobo network */ +#undef SPINDLE_ROOTS + /* Define to 1 if you have the ANSI C header files. */ #undef STDC_HEADERS diff --git a/src/fe/configure b/src/fe/configure index 05bd6846..948a1857 100755 --- a/src/fe/configure +++ b/src/fe/configure @@ -805,6 +805,7 @@ enable_fast_install with_gnu_ld with_sysroot enable_libtool_lock +with_default_roots with_default_port with_default_num_ports with_localstorage @@ -1504,6 +1505,8 @@ Optional Packages: --with-gnu-ld assume the C compiler uses GNU ld [default=no] --with-sysroot=DIR Search for dependent libraries within DIR (or the compiler's sysroot if not specified). + --with-default-roots=NUM + The number of roots in a cobo network --with-default-port=NUM TCP/IP Port for Spindle server communication --with-default-numports=NUM Number of TCP/IP ports to scan for Spindle server @@ -16247,11 +16250,20 @@ ac_compiler_gnu=$ac_cv_c_compiler_gnu #Configure operations that are common between the Spindle and Spindle-client configurations #Network port and local storage location +DEFAULT_ROOTS=1 DEFAULT_PORT=21940 DEFAULT_LOC='$TMPDIR' DEFAULT_NUM_COBO_PORTS=25 +# Check whether --with-default-roots was given. +if test "${with_default_roots+set}" = set; then : + withval=$with_default_roots; SPINDLE_ROOTS=${withval} +else + SPINDLE_ROOTS=$DEFAULT_ROOTS +fi + + # Check whether --with-default-port was given. if test "${with_default_port+set}" = set; then : withval=$with_default_port; SPINDLE_PORT=${withval} @@ -16276,6 +16288,11 @@ else fi +cat >>confdefs.h <<_ACEOF +#define SPINDLE_ROOTS $SPINDLE_ROOTS +_ACEOF + + cat >>confdefs.h <<_ACEOF #define SPINDLE_PORT $SPINDLE_PORT _ACEOF diff --git a/src/fe/startup/parseargs.cc b/src/fe/startup/parseargs.cc index aabe27ce..6b1753fc 100644 --- a/src/fe/startup/parseargs.cc +++ b/src/fe/startup/parseargs.cc @@ -45,6 +45,7 @@ using namespace std; #define AUDITTYPE 'k' #define RELOCSO 'l' #define NOCLEAN 'n' +#define COBO_FOREST 'm' #define LOCATION 'o' #define PUSH 'p' #define PULL 'q' @@ -163,6 +164,7 @@ static const int DISABLE_LOGGING_FLAGS = OPTION_HIDDEN; #endif static bool logging_enabled = DEFAULT_LOGGING_ENABLED; +static unsigned int spindle_roots = SPINDLE_ROOTS; static unsigned int spindle_port = SPINDLE_PORT; static unsigned int num_ports = NUM_COBO_PORTS; @@ -193,6 +195,8 @@ struct argp_option options[] = { "These options configure Spindle's network model. Typical Spindle runs should not need to set these.", GROUP_NETWORK }, { "cobo", COBO, NULL, 0, "Use a tree-based cobo network for distributing objects", GROUP_NETWORK }, + { "roots", COBO_FOREST, "X", 0, + "The number of roots in a cobo network. Default: " STR(SPINDLE_ROOTS), GROUP_NETWORK }, { "port", PORT, "port1-port2", 0, "TCP/IP port range for Spindle servers. Default: " STR(SPINDLE_PORT) "-" STR(SPINDLE_MAX_PORT), GROUP_NETWORK }, { NULL, 0, NULL, 0, @@ -321,6 +325,15 @@ static int parse(int key, char *arg, struct argp_state *vstate) preload_file = arg; return 0; } + else if (entry->key == COBO_FOREST) { + int v = atoi(arg); + if (v <= 0) { + argp_error(state, "'roots' was given a negative or 0 value or invalid value"); + } else { + spindle_roots = v; + } + return 0; + } else if (entry->key == PORT) { spindle_port = atoi(arg); if (!spindle_port) { @@ -503,6 +516,11 @@ char *getPreloadFile() return preload_file; } +unsigned int getRoots() +{ + return spindle_roots; +} + unsigned int getPort() { return spindle_port; @@ -636,6 +654,7 @@ void parseCommandLine(int argc, char *argv[], spindle_args_t *args) opt_t opts = parseArgs(argc, argv); args->number = getpid(); + args->num_roots = getRoots(); args->port = getPort(); args->num_ports = getNumPorts(); args->opts = opts; diff --git a/src/fe/startup/spindle_fe.cc b/src/fe/startup/spindle_fe.cc index 863e6d1f..bbc0b2a0 100644 --- a/src/fe/startup/spindle_fe.cc +++ b/src/fe/startup/spindle_fe.cc @@ -60,7 +60,7 @@ void pack_param(char *value, char *buffer, unsigned int &pos) static int pack_data(spindle_args_t *args, void* &buffer, unsigned &buffer_size) { - buffer_size = sizeof(unsigned int) * 6; + buffer_size = sizeof(unsigned int) * 7; buffer_size += sizeof(opt_t); buffer_size += sizeof(unique_id_t); buffer_size += args->location ? strlen(args->location) + 1 : 1; @@ -70,6 +70,7 @@ static int pack_data(spindle_args_t *args, void* &buffer, unsigned &buffer_size) unsigned int pos = 0; char *buf = (char *) malloc(buffer_size); pack_param(args->number, buf, pos); + pack_param(args->num_roots, buf, pos); pack_param(args->port, buf, pos); pack_param(args->num_ports, buf, pos); pack_param(args->opts, buf, pos); diff --git a/src/fe/startup/spindle_fe_main.cc b/src/fe/startup/spindle_fe_main.cc index 3bfa31a6..37d67acd 100644 --- a/src/fe/startup/spindle_fe_main.cc +++ b/src/fe/startup/spindle_fe_main.cc @@ -77,7 +77,6 @@ int main(int argc, char *argv[]) bare_printf2("%s ", app_argv[i]); } bare_printf2("\n"); - if (params.use_launcher == serial_launcher) { debug_printf("Starting application in serial mode\n"); result = startSerialFE(app_argc, app_argv, daemon_argc, daemon_argv, ¶ms); diff --git a/src/include/spindle_launch.h b/src/include/spindle_launch.h index 6244e5df..999f7e98 100644 --- a/src/include/spindle_launch.h +++ b/src/include/spindle_launch.h @@ -80,6 +80,9 @@ typedef struct { /* A unique number that will be used to identify this spindle session */ unsigned int number; + /* The number of roots in the cobo network */ + unsigned int num_roots; + /* The beginning port in a range that will be used for server->server communication */ unsigned int port; diff --git a/src/logging/spindle_debug.h b/src/logging/spindle_debug.h index 65429987..653c4ba1 100644 --- a/src/logging/spindle_debug.h +++ b/src/logging/spindle_debug.h @@ -20,7 +20,7 @@ Place, Suite 330, Boston, MA 02111-1307 USA #include #include -#define LOGD_DEBUG +#define LOGD_DEBUG #if defined(LOGD_DEBUG) #define LDCSDEBUG 1 @@ -31,12 +31,14 @@ extern "C" { #if defined(__cplusplus) } #endif + #elif defined(DEBUG) #define LDCSDEBUG 1 #define debug_printf(format, ...) \ do { \ fprintf(stderr, "[%s:%u@%d] - " format, __FILE__, __LINE__, getpid(), ## __VA_ARGS__); \ } while (0) + #elif defined(SIONDEBUG) #define LDCSDEBUG 1 #include "sion_debug.h" @@ -44,8 +46,10 @@ extern "C" { do { \ sion_dprintfp(32, __FILE__, getpid(), "[L%04u, %12.2f] - " format, __LINE__,_sion_get_time(), ## __VA_ARGS__); \ } while (0) + #else #define debug_printf(format, ...) + #endif #if defined(LOGD_DEBUG) @@ -66,4 +70,21 @@ extern "C" { #define err_printf(S, ...) debug_printf(S, ## __VA_ARGS__) #endif + +//#define FOREST_DEBUG +#if defined(FOREST_DEBUG) +#define cobo_dbg_printf(format, ...) \ + do { \ + fprintf(stderr, "COBO:%6d: " format " (%s:%d)\n", getpid(), ## __VA_ARGS__, __FILE__, __LINE__); \ + } while (0) +#undef err_printf +#define err_printf cobo_dbg_printf +#else +#define cobo_dbg_printf(format, ...) +#define md_cobo_dbg_printf(format, ...) +#endif + #endif + + + diff --git a/src/server/auditserver/ldcs_audit_server_handlers.c b/src/server/auditserver/ldcs_audit_server_handlers.c index 25f6f107..7bd5a268 100644 --- a/src/server/auditserver/ldcs_audit_server_handlers.c +++ b/src/server/auditserver/ldcs_audit_server_handlers.c @@ -798,6 +798,7 @@ static int handle_exit_broadcast(ldcs_process_data_t *procdata) out_msg.header.len = 0; out_msg.data = NULL; + procdata->md_path = NULL; ldcs_audit_server_md_broadcast(procdata, &out_msg); mark_exit(); @@ -819,11 +820,13 @@ static int handle_request(ldcs_process_data_t *procdata, node_peer_t from, ldcs_ err_printf("Badly formed request message with starting char '%c'\n", msg_type); return -1; } + procdata->md_path = pathname; is_dir = (msg_type == 'D'); if (is_dir) return handle_request_directory(procdata, from, pathname); else return handle_request_file(procdata, from, pathname); + procdata->md_path = NULL; return result; } @@ -1002,8 +1005,9 @@ static int handle_send_directory_query(ldcs_process_data_t *procdata, char *dire bytes_written = snprintf(out_msg.data, MAX_PATH_LEN+1, "D%s", directory); out_msg.header.len = bytes_written+1; - + procdata->md_path = directory; ldcs_audit_server_md_forward_query(procdata, &out_msg); + procdata->md_path = NULL; return 0; } @@ -1023,7 +1027,10 @@ static int handle_send_file_query(ldcs_process_data_t *procdata, char *fullpath) bytes_written = snprintf(out_msg.data, MAX_PATH_LEN+1, "F%s", fullpath); out_msg.header.len = bytes_written+1; + + procdata->md_path = fullpath; ldcs_audit_server_md_forward_query(procdata, &out_msg); + procdata->md_path = NULL; return 0; } @@ -1155,7 +1162,9 @@ int handle_send_msg_to_keys(ldcs_process_data_t *procdata, ldcs_message_t *msg, if (procdata->dist_model == LDCS_PUSH || force_broadcast) { debug_printf3("Pushing message to all children\n"); + procdata->md_path = key; result = ldcs_audit_server_md_broadcast_noncontig(procdata, msg, secondary_data, secondary_size); + procdata->md_path = NULL; if (result == -1) global_result = -1; have_done_broadcast = 1; @@ -1382,6 +1391,7 @@ static int handle_preload_done(ldcs_process_data_t *procdata) done_msg.header.len = 0; done_msg.data = NULL; + procdata->md_path = NULL; result = ldcs_audit_server_md_broadcast(procdata, &done_msg); if (result == -1) { err_printf("Error broadcasting done message during preload\n"); @@ -1409,6 +1419,7 @@ static int handle_recv_selfload_file(ldcs_process_data_t *procdata, ldcs_message int result, nc, global_result = 0, found_client = 0; debug_printf("Recieved notice to selfload file %s\n", filename); + result = handle_send_msg_to_keys(procdata, msg, filename, NULL, 0, request_broadcast, 0); if (result == -1) { err_printf("Could not send selfload file message\n"); @@ -1745,6 +1756,7 @@ static int handle_broadcast_metadata(ldcs_process_data_t *procdata, char *pathna /* Send packet on network */ starttime = ldcs_get_time(); + result = handle_send_msg_to_keys(procdata, &msg, pathname, NULL, 0, 0, 1); procdata->server_stat.libdist.cnt++; procdata->server_stat.libdist.bytes += packet_size; @@ -1858,6 +1870,7 @@ static int handle_metadata_request(ldcs_process_data_t *procdata, char *pathname { ldcs_message_t msg; int pathlen; + int ret; if (been_requested(procdata->pending_metadata_requests, pathname)) { debug_printf2("Metadata %s has already been requested. Not resending request\n", pathname); @@ -1877,7 +1890,10 @@ static int handle_metadata_request(ldcs_process_data_t *procdata, char *pathname msg.header.len = pathlen; msg.data = pathname; - return ldcs_audit_server_md_forward_query(procdata, &msg); + procdata->md_path = pathname; + ret = ldcs_audit_server_md_forward_query(procdata, &msg); + procdata->md_path = NULL; + return ret; } /** @@ -1999,6 +2015,7 @@ static int handle_send_exit_ready_if_done(ldcs_process_data_t *procdata) } else { debug_printf2("Sending exit ready message to parent\n"); + procdata->md_path = NULL; return ldcs_audit_server_md_forward_query(procdata, &msg); } } @@ -2054,5 +2071,9 @@ static int handle_send_exit_cancel(ldcs_process_data_t *procdata) msg.header.len = 0; msg.data = NULL; + procdata->md_path = NULL; +#ifdef LDCS_DBG + cobo_dbg_printf("handle_send_exit_cancel: %s", procdata->md_path); +#endif return ldcs_audit_server_md_forward_query(procdata, &msg); } diff --git a/src/server/auditserver/ldcs_audit_server_md.h b/src/server/auditserver/ldcs_audit_server_md.h index da692408..1cb610eb 100644 --- a/src/server/auditserver/ldcs_audit_server_md.h +++ b/src/server/auditserver/ldcs_audit_server_md.h @@ -59,6 +59,9 @@ typedef void* node_peer_t; /* Any initialization can be done here. */ int ldcs_audit_server_md_init(unsigned int port, unsigned int num_ports, unique_id_t unique_id, ldcs_process_data_t *data); +/* Any further initialization (after receiving spindle command arguments) can be done here. */ +int ldcs_audit_server_md_init_post_process(unsigned int md_roots); + /* register_fd should, for every fd we want Spindle to recv messages on, call ldcs_listen_register_fd with the fd and a callback function to be triggered when a message arrives. */ @@ -101,6 +104,8 @@ int ldcs_audit_server_md_send_noncontig(ldcs_process_data_t *ldcs_process_data, int ldcs_audit_server_md_broadcast_noncontig(ldcs_process_data_t *ldcs_process_data, ldcs_message_t *msg, void *secondary_data, size_t secondary_size); +void ldcs_audit_server_md_barrier(); + int ldcs_audit_server_md_get_num_children(ldcs_process_data_t *procdata); #if defined(__cplusplus) diff --git a/src/server/auditserver/ldcs_audit_server_md_cobo.c b/src/server/auditserver/ldcs_audit_server_md_cobo.c index 65da5506..f415f339 100644 --- a/src/server/auditserver/ldcs_audit_server_md_cobo.c +++ b/src/server/auditserver/ldcs_audit_server_md_cobo.c @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include "ldcs_api.h" #include "ldcs_api_listen.h" @@ -33,13 +35,77 @@ #include "cobo_comm.h" #include "config.h" + int ldcs_audit_server_md_cobo_CB ( int fd, int nc, void *data ); int ldcs_audit_server_md_cobo_send_msg ( int fd, ldcs_message_t *msg ); extern unique_id_t unique_id; +int cobo_rank = -1; +int cobo_size = -1; +int spindle_root_count = 1; +int spindle_root_hop = -1; + extern int ll_read(int fd, void *buf, size_t count); +static void ldcs_audit_server_md_backtrace_print() +{ + int j, nptrs; + void *buffer[100]; + char **strings; + + nptrs = backtrace(buffer, 100); + /* backtrace_symbols_fd(buffer, nptrs, STDOUT_FILENO)*/ + strings = backtrace_symbols(buffer, nptrs); + if (strings == NULL) { + perror("backtrace_symbols"); + exit(EXIT_FAILURE); + } + for (j = 0; j < nptrs; j++) { + debug_printf(" %s", strings[j]); + } + free(strings); + + return; +} + +static unsigned int ldcs_audit_server_md_hashval(char* str) +{ + unsigned int hash = 5381; + unsigned int c; + while ((c = *str++)) { + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + return hash; +} + +//static int ldcs_audit_server_md_get_responsible_tree_id(ldcs_process_data_t *ldcs_process_data) +static int ldcs_audit_server_md_get_responsible_tree_id(char *path) +{ + int responsible_tree_id = COBO_PRIMARY_TREE; + int ret; + struct stat st; + char *dir, *path_dup; + + /*kento*/ + /*TODO: Can I use message header for this if-else to find out + if tree_id is for management communication or library broadcast ? */ + if (path != NULL && strlen(path) > 0) { + path_dup = strdup(path); + ret = stat(path_dup, &st); + if ((st.st_mode & S_IFMT) == S_IFDIR) { + /* path is directory */ + dir = path_dup; + } else { + /* path not is directory */ + dir = dirname(path_dup); + } + responsible_tree_id = ldcs_audit_server_md_hashval(dir) % spindle_root_count; + free(path_dup); + } + return responsible_tree_id; +} + int read_msg(int fd, node_peer_t *peer, ldcs_message_t *msg) { int result; @@ -84,6 +150,7 @@ int ldcs_audit_server_md_init(unsigned int port, unsigned int num_ports, unsigned int *portlist; int my_rank, ranks, fanout; int i; + char* env; portlist = malloc(sizeof(unsigned int) * (num_ports + 1)); for (i = 0; i < num_ports; i++) { @@ -100,99 +167,170 @@ int ldcs_audit_server_md_init(unsigned int port, unsigned int num_ports, debug_printf2("cobo_open complete. Cobo rank %d/%d\n", my_rank, ranks); free(portlist); - data->server_stat.md_rank = data->md_rank = my_rank; - data->server_stat.md_size = data->md_size = ranks; + cobo_rank = data->server_stat.md_rank = data->md_rank = my_rank; + cobo_size = data->server_stat.md_size = data->md_size = ranks; + spindle_root_hop = cobo_size; data->md_listen_to_parent = 0; - cobo_get_num_childs(&fanout); + // cobo_get_num_childs(&fanout); + cobo_get_num_forest_childs(COBO_PRIMARY_TREE, &fanout); data->server_stat.md_fan_out = data->md_fan_out = fanout; cobo_barrier(); - /* send ack about being ready */ if (data->md_rank == 0) { int root_fd, ack=13; - /* send fe client signal to stop (ack) */ - cobo_get_parent_socket(&root_fd); + // cobo_get_parent_socket(&root_fd); + cobo_get_forest_parent_socket(COBO_PRIMARY_TREE, &root_fd); ldcs_cobo_write_fd(root_fd, &ack, sizeof(ack)); debug_printf3("sent FE client signal that server are ready %d\n",ack); } - + return(rc); } -int ldcs_audit_server_md_register_fd ( ldcs_process_data_t *ldcs_process_data ) { +int ldcs_audit_server_md_init_post_process(unsigned int md_roots) +{ + if (md_roots > 1) { + spindle_root_count = md_roots; + if (spindle_root_count > cobo_size || spindle_root_count <= 0) { + err_printf("spindle_root_count(%d) error", spindle_root_count); + exit(1); + } + spindle_root_hop = cobo_size / spindle_root_count; + if (cobo_rank == 0) { + cobo_dbg_printf("root_count: %d root_hop: %d", spindle_root_count, spindle_root_hop); + } + } else { + spindle_root_count = 1; + spindle_root_hop = cobo_size / spindle_root_count; + } + return 1; +} + +void ldcs_audit_server_md_barrier() +{ + cobo_barrier(); + return; +} + + +int ldcs_audit_server_md_register_fd ( ldcs_process_data_t *ldcs_process_data ) +{ int rc=0, i; int parent_fd, child_fd; - int num_childs; - - if(cobo_get_parent_socket(&parent_fd)!=COBO_SUCCESS) { - err_printf("Error, could not get parent socket\n"); - assert(0); + int num_parents, num_childs; + + /* Registering parents */ + cobo_get_num_forest_parents(COBO_FOREST, &num_parents); + for (i = 0; i < num_parents; i++) { + if(cobo_get_forest_parent_socket_at(i, &parent_fd)!=COBO_SUCCESS) { + err_printf("Error, could not get parent socket\n"); + assert(0); + } + debug_printf3("Registering fd %d for cobo parent connection\n",parent_fd); + ldcs_listen_register_fd(parent_fd, 0, &ldcs_audit_server_md_cobo_CB, (void *) ldcs_process_data); } - debug_printf3("Registering fd %d for cobo parent connection\n",parent_fd); - ldcs_listen_register_fd(parent_fd, 0, &ldcs_audit_server_md_cobo_CB, (void *) ldcs_process_data); + /* Registering spindle_fe_main parent */ + if (ldcs_process_data->md_rank == 0) { + if(cobo_get_forest_parent_socket(COBO_PRIMARY_TREE, &parent_fd)!=COBO_SUCCESS) { + err_printf("Error, could not get parent socket\n"); + assert(0); + } + debug_printf3("Registering fd %d for cobo parent connection\n",parent_fd); + ldcs_listen_register_fd(parent_fd, 0, &ldcs_audit_server_md_cobo_CB, (void *) ldcs_process_data); + } ldcs_process_data->md_listen_to_parent=1; - cobo_get_num_childs(&num_childs); + /* Registering childs */ + cobo_get_num_forest_childs(COBO_FOREST, &num_childs); for (i = 0; imd_listen_to_parent) { - if(cobo_get_parent_socket(&parent_fd)!=COBO_SUCCESS) { - _error("cobo internal error (parent socket)"); - } - ldcs_process_data->md_listen_to_parent=0; - ldcs_listen_unregister_fd(parent_fd); - - cobo_get_num_childs(&num_childs); - for (i = 0; imd_listen_to_parent) return rc; + + /* Registering parents */ + cobo_get_num_forest_parents(COBO_FOREST, &num_parents); + for (i = 0; i < num_parents; i++) { + if(cobo_get_forest_parent_socket_at(i, &parent_fd)!=COBO_SUCCESS) { + err_printf("Error, could not get parent socket\n"); + assert(0); + } + debug_printf3("Registering fd %d for cobo parent connection\n",parent_fd); + ldcs_listen_unregister_fd(parent_fd); } + /* Registering spindle_fe_main parent */ + if (ldcs_process_data->md_rank == 0) { + if(cobo_get_forest_parent_socket(COBO_PRIMARY_TREE, &parent_fd)!=COBO_SUCCESS) { + err_printf("Error, could not get spindle_fe parent socket\n"); + assert(0); + } + debug_printf3("Registering fd %d for cobo spindle_fe parent connection\n",parent_fd); + ldcs_listen_unregister_fd(parent_fd); + } + ldcs_process_data->md_listen_to_parent=0; + + /* Registering childs */ + cobo_get_num_forest_childs(COBO_FOREST, &num_childs); + for (i = 0; imd_rank == 0) { - debug_printf3("Decided I am responsible for file %s\n", filename); - return 1; - } else { - debug_printf3("Decided I am not responsible for file %s\n", filename); - return 0; - } +int ldcs_audit_server_md_is_responsible ( ldcs_process_data_t *ldcs_process_data, char *filename ) +{ + int responsible_tree_id = ldcs_audit_server_md_get_responsible_tree_id(filename); + + if(ldcs_process_data->md_rank == responsible_tree_id) { + // cobo_dbg_printf("I am responsible for file: %s (tree_id: %d)", filename, responsible_tree_id); + debug_printf3("Decided I am responsible for file %s\n", filename); + return 1; + } + return 0; } int ldcs_audit_server_md_forward_query(ldcs_process_data_t *ldcs_process_data, ldcs_message_t* msg) { int parent_fd; int result; - if (ldcs_process_data->md_rank == 0) { + int responsible_tree_id; + + responsible_tree_id = ldcs_audit_server_md_get_responsible_tree_id(ldcs_process_data->md_path); + + if (ldcs_process_data->md_rank == responsible_tree_id) { /* We're root--no one to forward a query to*/ return 0; } - cobo_get_parent_socket(&parent_fd); + cobo_get_forest_parent_socket(responsible_tree_id, &parent_fd); result = write_msg(parent_fd, msg); if (result < 0) { err_printf("Problem writing message to parent, result is %d\n", result); @@ -236,8 +374,7 @@ int ldcs_audit_server_md_recv_from_parent(ldcs_message_t *msg) { int fd; node_peer_t peer; - - cobo_get_parent_socket(&fd); + cobo_get_forest_parent_socket(COBO_PRIMARY_TREE, &fd); return read_msg(fd, &peer, msg); } @@ -312,14 +449,18 @@ int ldcs_audit_server_md_broadcast(ldcs_process_data_t *ldcs_process_data, ldcs_ int fd, i; int result, global_result = 0; int num_childs = 0; + int responsible_tree_id = 0; + + responsible_tree_id = ldcs_audit_server_md_get_responsible_tree_id(ldcs_process_data->md_path); - cobo_get_num_childs(&num_childs); + cobo_get_num_forest_childs(responsible_tree_id, &num_childs); for (i = 0; imd_path); + cobo_get_num_forest_childs(responsible_tree_id, &num_childs); for (i = 0; iserver network */ ldcs_audit_server_md_init(port, num_ports, unique_id, &ldcs_process_data); + /* Use network to broadcast configuration parameters */ ldcs_message_t msg; msg.header.type = 0; msg.header.len = 0; msg.data = NULL; debug_printf2("Reading setup message from parent\n"); + result = ldcs_audit_server_md_recv_from_parent(&msg); if (result == -1) { err_printf("Error reading setup message from parent\n"); return -1; } assert(msg.header.type == LDCS_MSG_SETTINGS); + ldcs_process_data.md_path = NULL; + result = ldcs_audit_server_md_broadcast(&ldcs_process_data, &msg); if (result == -1) { err_printf("Error broadcast setup message to children\n"); @@ -88,6 +92,10 @@ int ldcs_audit_server_network_setup(unsigned int port, unsigned int num_ports, u *packed_setup_data = msg.data; *data_size = msg.header.len; + /* Synchronize here because a library file may be tranfered via ldcs_audit_server_md_recv_from_parent + if a fast process start sending files by using this tree. + */ + ldcs_audit_server_md_barrier(); return 0; } @@ -100,7 +108,9 @@ int ldcs_audit_server_process(spindle_args_t *args) ldcs_process_data.location = args->location; ldcs_process_data.number = args->number; ldcs_process_data.pythonprefix = args->pythonprefix; + ldcs_process_data.md_roots = args->num_roots; ldcs_process_data.md_port = args->port; + ldcs_process_data.md_path = NULL; ldcs_process_data.opts = args->opts; ldcs_process_data.pending_requests = new_requestor_list(); ldcs_process_data.completed_requests = new_requestor_list(); @@ -144,17 +154,22 @@ int ldcs_audit_server_process(spindle_args_t *args) ldcs_process_data.serverfd = fd; ldcs_audit_server_md_register_fd(&ldcs_process_data); - /* register server listen fd to listener */ if (fd != -1) ldcs_listen_register_fd(fd, serverid, &_ldcs_server_CB, (void *) &ldcs_process_data); - debug_printf3("Initializing cache\n"); ldcs_cache_init(); return 0; } +int ldcs_audit_server_network_post_setup(spindle_args_t* args) +{ + int result; + result = ldcs_audit_server_md_init_post_process(args->num_roots); + return result; +} + int ldcs_audit_server_run() { /* start loop */ diff --git a/src/server/auditserver/ldcs_audit_server_process.h b/src/server/auditserver/ldcs_audit_server_process.h index fe06a924..41675a1e 100644 --- a/src/server/auditserver/ldcs_audit_server_process.h +++ b/src/server/auditserver/ldcs_audit_server_process.h @@ -115,7 +115,7 @@ struct ldcs_process_data_struct char *pythonprefix; int number; int preload_done; - opt_t opts; + unsigned int opts; requestor_list_t pending_requests; requestor_list_t completed_requests; requestor_list_t pending_metadata_requests; @@ -126,7 +126,9 @@ struct ldcs_process_data_struct int md_size; int md_fan_out; /* number of childs */ int md_listen_to_parent; + int md_roots; unsigned int md_port; + char *md_path; /* statistics */ ldcs_server_stat_t server_stat; @@ -136,6 +138,7 @@ typedef struct ldcs_process_data_struct ldcs_process_data_t; int ldcs_audit_server_network_setup(unsigned int port, unsigned int num_ports, unique_id_t unique_id, void **packed_setup_data, int *data_size); int ldcs_audit_server_process (spindle_args_t *args); +int ldcs_audit_server_network_post_setup(spindle_args_t* args); int ldcs_audit_server_run(); #define CLIENT_CB_AUX_FD INT32_MAX diff --git a/src/server/config.h.in b/src/server/config.h.in index 879cef0d..d894a637 100644 --- a/src/server/config.h.in +++ b/src/server/config.h.in @@ -121,6 +121,9 @@ /* The default port for Spindle */ #undef SPINDLE_PORT +/* The default number of roots in a cobo network */ +#undef SPINDLE_ROOTS + /* Define to 1 if you have the ANSI C header files. */ #undef STDC_HEADERS diff --git a/src/server/configure b/src/server/configure index 49fa78d0..9f46b894 100755 --- a/src/server/configure +++ b/src/server/configure @@ -810,6 +810,7 @@ enable_fast_install with_gnu_ld with_sysroot enable_libtool_lock +with_default_roots with_default_port with_default_num_ports with_localstorage @@ -1503,6 +1504,8 @@ Optional Packages: --with-gnu-ld assume the C compiler uses GNU ld [default=no] --with-sysroot=DIR Search for dependent libraries within DIR (or the compiler's sysroot if not specified). + --with-default-roots=NUM + The number of roots in a cobo network --with-default-port=NUM TCP/IP Port for Spindle server communication --with-default-numports=NUM Number of TCP/IP ports to scan for Spindle server @@ -16246,11 +16249,20 @@ ac_compiler_gnu=$ac_cv_c_compiler_gnu #Configure operations that are common between the Spindle and Spindle-client configurations #Network port and local storage location +DEFAULT_ROOTS=1 DEFAULT_PORT=21940 DEFAULT_LOC='$TMPDIR' DEFAULT_NUM_COBO_PORTS=25 +# Check whether --with-default-roots was given. +if test "${with_default_roots+set}" = set; then : + withval=$with_default_roots; SPINDLE_ROOTS=${withval} +else + SPINDLE_ROOTS=$DEFAULT_ROOTS +fi + + # Check whether --with-default-port was given. if test "${with_default_port+set}" = set; then : withval=$with_default_port; SPINDLE_PORT=${withval} @@ -16275,6 +16287,11 @@ else fi +cat >>confdefs.h <<_ACEOF +#define SPINDLE_ROOTS $SPINDLE_ROOTS +_ACEOF + + cat >>confdefs.h <<_ACEOF #define SPINDLE_PORT $SPINDLE_PORT _ACEOF diff --git a/src/server/startup/spindle_be.cc b/src/server/startup/spindle_be.cc index 5e2ad2df..825dd5ff 100644 --- a/src/server/startup/spindle_be.cc +++ b/src/server/startup/spindle_be.cc @@ -50,6 +50,7 @@ static int unpack_data(spindle_args_t *args, void *buffer, int buffer_size) int pos = 0; char *buf = static_cast(buffer); unpack_param(args->number, buf, pos); + unpack_param(args->num_roots, buf, pos); unpack_param(args->port, buf, pos); unpack_param(args->num_ports, buf, pos); unpack_param(args->opts, buf, pos); @@ -130,7 +131,6 @@ int spindleRunBE(unsigned int port, unsigned int num_ports, unique_id_t unique_i assert(args.unique_id == unique_id); assert(args.port == port); - /* Expand environment variables in location. */ char *new_location = parse_location(args.location); if (!new_location) { @@ -141,6 +141,12 @@ int spindleRunBE(unsigned int port, unsigned int num_ports, unique_id_t unique_i free(args.location); args.location = new_location; + result = ldcs_audit_server_network_post_setup(&args); + if (result == -1) { + err_printf("Error in ldcs_audit_server_network_post_setup"); + return -1; + } + result = ldcs_audit_server_process(&args); if (result == -1) { err_printf("Error in ldcs_audit_server_process\n"); diff --git a/testsuite/runTests_forest b/testsuite/runTests_forest new file mode 100755 index 00000000..0d82e550 --- /dev/null +++ b/testsuite/runTests_forest @@ -0,0 +1,54 @@ +#!/bin/sh + +#export SPINDLE_TEST_ARGS="$@" +export SPINDLE_BLUEGENE="false" +num_roots=$1 + +./run_driver --dlopen --push --roots $num_roots +./run_driver --dlreopen --push --roots $num_roots +./run_driver --reorder --push --roots $num_roots +./run_driver --partial --push --roots $num_roots +./run_driver --ldpreload --push --roots $num_roots + +./run_driver --dependency --pull --roots $num_roots +./run_driver --dlopen --pull --roots $num_roots +./run_driver --dlreopen --pull --roots $num_roots +./run_driver --reorder --pull --roots $num_roots +./run_driver --partial --pull --roots $num_roots +./run_driver --ldpreload --pull --roots $num_roots + +if test "x$SPINDLE_BLUEGENE" != "xtrue"; then +./run_driver --dependency --fork --roots $num_roots +./run_driver --dlopen --fork --roots $num_roots +./run_driver --dlreopen --fork --roots $num_roots +./run_driver --reorder --fork --roots $num_roots +./run_driver --partial --fork --roots $num_roots +./run_driver --ldpreload --fork --roots $num_roots +fi + + +if test "x$SPINDLE_BLUEGENE" != "xtrue"; then +./run_driver --dependency --forkexec --roots $num_roots +./run_driver --dlopen --forkexec --roots $num_roots +./run_driver --dlreopen --forkexec --roots $num_roots +./run_driver --reorder --forkexec --roots $num_roots +./run_driver --partial --forkexec --roots $num_roots +./run_driver --ldpreload --forkexec --roots $num_roots +fi + +./run_driver --dependency --chdir --roots $num_roots +./run_driver --dlopen --chdir --roots $num_roots +./run_driver --dlreopen --chdir --roots $num_roots +./run_driver --reorder --chdir --roots $num_roots +./run_driver --partial --chdir --roots $num_roots +./run_driver --ldpreload --chdir--roots $num_roots + +./run_driver --dependency --preload --roots $num_roots +./run_driver --dlopen --preload --roots $num_roots +./run_driver --dlreopen --preload --roots $num_roots +./run_driver --reorder --preload --roots $num_roots +./run_driver --partial --preload --roots $num_roots +./run_driver --ldpreload --preload --roots $num_roots + + +