Search
j0ke.net Open Build Service
>
Projects
>
GFS
:
experimental
>
openais
> revision-1499.patch
Sign Up
|
Log In
Username
Password
Cancel
Overview
Repositories
Revisions
Requests
Users
Advanced
Attributes
Meta
File revision-1499.patch of Package openais (Revision 1)
Currently displaying revision
1
,
show latest
Index: exec/cfg.c =================================================================== --- exec/cfg.c (revision 1498) +++ exec/cfg.c (revision 1499) @@ -268,7 +268,7 @@ res_lib_cfg_ringreenable.header.id = MESSAGE_RES_CFG_RINGREENABLE; res_lib_cfg_ringreenable.header.size = sizeof (struct res_lib_cfg_ringreenable); res_lib_cfg_ringreenable.header.error = SA_AIS_OK; - openais_conn_send_response ( + openais_response_send ( req_exec_cfg_ringreenable->source.conn, &res_lib_cfg_ringreenable, sizeof (struct res_lib_cfg_ringreenable)); @@ -312,7 +312,7 @@ strcpy ((char *)&res_lib_cfg_ringstatusget.interface_name[i], totem_ip_string); } - openais_conn_send_response ( + openais_response_send ( conn, &res_lib_cfg_ringstatusget, sizeof (struct res_lib_cfg_ringstatusget)); Index: exec/evs.c =================================================================== --- exec/evs.c (revision 1498) +++ exec/evs.c (revision 1499) @@ -244,7 +244,8 @@ */ for (list = confchg_notify.next; list != &confchg_notify; list = list->next) { evs_pd = list_entry (list, struct evs_pd, list); - openais_conn_send_response (evs_pd->conn, + openais_dispatch_send ( + evs_pd->conn, &res_evs_confchg_callback, sizeof (res_evs_confchg_callback)); } @@ -262,7 +263,9 @@ list_init (&evs_pd->list); list_add (&evs_pd->list, &confchg_notify); - openais_conn_send_response (conn, &res_evs_confchg_callback, + openais_dispatch_send ( + conn, + &res_evs_confchg_callback, sizeof (res_evs_confchg_callback)); return (0); @@ -308,7 +311,9 @@ res_lib_evs_join.header.id = MESSAGE_RES_EVS_JOIN; res_lib_evs_join.header.error = error; - openais_conn_send_response (conn, &res_lib_evs_join, + openais_response_send ( + conn, + &res_lib_evs_join, sizeof (struct res_lib_evs_join)); } @@ -354,7 +359,9 @@ res_lib_evs_leave.header.id = MESSAGE_RES_EVS_LEAVE; res_lib_evs_leave.header.error = error; - openais_conn_send_response (conn, &res_lib_evs_leave, + openais_response_send ( + conn, + &res_lib_evs_leave, sizeof (struct res_lib_evs_leave)); } @@ -397,7 +404,9 @@ res_lib_evs_mcast_joined.header.id = MESSAGE_RES_EVS_MCAST_JOINED; res_lib_evs_mcast_joined.header.error = error; - openais_conn_send_response (conn, &res_lib_evs_mcast_joined, + openais_response_send ( + conn, + &res_lib_evs_mcast_joined, sizeof (struct res_lib_evs_mcast_joined)); } @@ -443,7 +452,9 @@ res_lib_evs_mcast_groups.header.id = MESSAGE_RES_EVS_MCAST_GROUPS; res_lib_evs_mcast_groups.header.error = error; - openais_conn_send_response (conn, &res_lib_evs_mcast_groups, + openais_response_send ( + conn, + &res_lib_evs_mcast_groups, sizeof (struct res_lib_evs_mcast_groups)); } @@ -462,7 +473,9 @@ res_lib_evs_membership_get.member_list_entries = res_evs_confchg_callback.member_list_entries; - openais_conn_send_response (conn, &res_lib_evs_membership_get, + openais_response_send ( + conn, + &res_lib_evs_membership_get, sizeof (struct res_lib_evs_membership_get)); } @@ -487,8 +500,10 @@ int i, j; struct evs_pd *evs_pd; - res_evs_deliver_callback.header.size = sizeof (struct res_evs_deliver_callback) + - req_exec_evs_mcast->msg_len; + res_evs_deliver_callback.header.size = + sizeof (struct res_evs_deliver_callback) + + req_exec_evs_mcast->msg_len; + res_evs_deliver_callback.header.id = MESSAGE_RES_EVS_DELIVER_CALLBACK; res_evs_deliver_callback.header.error = SA_AIS_OK; res_evs_deliver_callback.msglen = req_exec_evs_mcast->msg_len; @@ -517,9 +532,13 @@ if (found) { res_evs_deliver_callback.local_nodeid = nodeid; - openais_conn_send_response (evs_pd->conn, &res_evs_deliver_callback, + openais_dispatch_send ( + evs_pd->conn, + &res_evs_deliver_callback, sizeof (struct res_evs_deliver_callback)); - openais_conn_send_response (evs_pd->conn, msg_addr, + openais_dispatch_send ( + evs_pd->conn, + msg_addr, req_exec_evs_mcast->msg_len); } } Index: exec/cpg.c =================================================================== --- exec/cpg.c (revision 1498) +++ exec/cpg.c (revision 1499) @@ -1,5 +1,5 @@ /* - * Copyright (c) 2006-2007 Red Hat, Inc. + * Copyright (c) 2006 Red Hat, Inc. * Copyright (c) 2006 Sun Microsystems, Inc. * * All rights reserved. @@ -32,6 +32,9 @@ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ +#ifndef OPENAIS_BSD +#include <alloca.h> +#endif #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> @@ -60,8 +63,11 @@ #include "totempg.h" #include "totemip.h" #include "main.h" +#include "flow.h" +#include "tlist.h" #include "ipc.h" #include "mempool.h" +#include "objdb.h" #include "service.h" #include "jhash.h" #include "swab.h" @@ -255,7 +261,7 @@ }; struct openais_service_handler cpg_service_handler = { - .name = (unsigned char*)"openais cluster closed process group service v1.01", + .name = (unsigned char *)"openais cluster closed process group service v1.01", .id = CPG_SERVICE, .private_data_size = sizeof (struct process_info), .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED, @@ -434,14 +440,14 @@ } if (conn) { - openais_conn_send_response(conn, buf, size); + openais_dispatch_send(conn, buf, size); } else { /* Send it to all listeners */ for (iter = gi->members.next, tmp=iter->next; iter != &gi->members; iter = tmp, tmp=iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) { - if (openais_conn_send_response(pi->trackerconn, buf, size) == -1) { + if (openais_dispatch_send(pi->trackerconn, buf, size) == -1) { // Error ?? } } @@ -477,14 +483,17 @@ struct group_info *gi = pi->group; mar_cpg_address_t notify_info; - log_printf(LOG_LEVEL_DEBUG, "exit_fn for conn=%p\n", conn); - if (gi) { notify_info.pid = pi->pid; notify_info.nodeid = totempg_my_nodeid_get(); notify_info.reason = CONFCHG_CPG_REASON_PROCDOWN; cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_PROCDOWN); list_del(&pi->list); + openais_ipc_flow_control_destroy ( + conn, + CPG_SERVICE, + (unsigned char *)gi->group_name.value, + (unsigned int)gi->group_name.length); } return (0); } @@ -531,7 +540,7 @@ req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin); req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn); - req_exec_cpg_iovec.iov_base = &req_exec_cpg_procjoin; + req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin; req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin); result = totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED); @@ -544,15 +553,17 @@ struct list_head *remlist) { int i; - struct list_head *iter, *iter2, *tmp; + struct list_head *iter, *iter2; struct process_info *pi; struct group_info *gi; for (i=0; i < GROUP_HASH_SIZE; i++) { - for (iter = group_lists[i].next; iter != &group_lists[i]; iter = iter->next) { + for (iter = group_lists[i].next; iter != &group_lists[i];) { gi = list_entry(iter, struct group_info, list); - for (iter2 = gi->members.next, tmp = iter2->next; iter2 != &gi->members; iter2 = tmp, tmp = iter2->next) { + iter = iter->next; + for (iter2 = gi->members.next; iter2 != &gi->members;) { pi = list_entry(iter2, struct process_info, list); + iter2 = iter2->next; if (pi->nodeid == nodeid) { @@ -643,7 +654,7 @@ /* Don't send this message until we get the final configuration message */ if (configuration_type == TOTEM_CONFIGURATION_REGULAR && req_exec_cpg_downlist.left_nodes) { - req_exec_cpg_iovec.iov_base = &req_exec_cpg_downlist; + req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_downlist; req_exec_cpg_iovec.iov_len = req_exec_cpg_downlist.header.size; totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED); @@ -816,7 +827,7 @@ struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = (struct req_exec_cpg_procjoin *)message; struct group_info *gi; struct process_info *pi; - struct list_head *iter; + volatile struct list_head *iter; mar_cpg_address_t notify_info; log_printf(LOG_LEVEL_DEBUG, "got procleave message from cluster node %d\n", nodeid); @@ -833,19 +844,28 @@ 1, ¬ify_info, MESSAGE_RES_CPG_CONFCHG_CALLBACK); - /* Find the node/PID to remove */ - for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { + /* + * Find the node/PID to remove + */ + for (iter = gi->members.next; iter != &gi->members;) { pi = list_entry(iter, struct process_info, list); + + iter = iter->next; + if (pi->pid == req_exec_cpg_procjoin->pid && pi->nodeid == nodeid) { + if (list_empty(&gi->members)) { + remove_group(gi); + } + list_del(&pi->list); - if (!pi->conn) + if (pi->conn) { + openais_conn_info_refcnt_dec(pi->conn); + } else { free(pi); + } - if (list_empty(&gi->members)) { - remove_group(gi); - } break; } } @@ -904,6 +924,7 @@ openais_ipc_flow_control_local_decrement (req_exec_cpg_mcast->source.conn); process_info = (struct process_info *)openais_conn_private_data_get (req_exec_cpg_mcast->source.conn); res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state; + openais_conn_info_refcnt_dec (req_exec_cpg_mcast->source.conn); } memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); @@ -914,7 +935,7 @@ for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->trackerconn) { - openais_conn_send_response( + openais_dispatch_send ( pi->trackerconn, buf, res_lib_cpg_mcast->header.size); @@ -993,6 +1014,7 @@ struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn); pi->conn = conn; + openais_conn_info_refcnt_inc (conn); log_printf(LOG_LEVEL_DEBUG, "lib_init_fn: conn=%p, pi=%p\n", conn, pi); return (0); } @@ -1041,7 +1063,7 @@ res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join); res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN; res_lib_cpg_join.header.error = error; - openais_conn_send_response(conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join)); + openais_response_send(conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join)); } /* Leave message from the library */ @@ -1052,8 +1074,6 @@ struct group_info *gi; SaAisErrorT error = SA_AIS_OK; - log_printf(LOG_LEVEL_DEBUG, "got leave request on %p\n", conn); - if (!pi || !pi->pid || !pi->group) { error = SA_AIS_ERR_INVALID_PARAM; goto leave_ret; @@ -1076,7 +1096,7 @@ res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave); res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE; res_lib_cpg_leave.header.error = error; - openais_conn_send_response(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave)); + openais_response_send(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave)); } /* Mcast message from the library */ @@ -1099,7 +1119,7 @@ res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; res_lib_cpg_mcast.header.error = SA_AIS_ERR_ACCESS; /* TODO Better error code ?? */ res_lib_cpg_mcast.flow_control_state = CPG_FLOW_CONTROL_DISABLED; - openais_conn_send_response(conn, &res_lib_cpg_mcast, + openais_response_send(conn, &res_lib_cpg_mcast, sizeof(res_lib_cpg_mcast)); return; } @@ -1107,15 +1127,16 @@ req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen; req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_MCAST); + openais_conn_info_refcnt_inc (conn); req_exec_cpg_mcast.pid = pi->pid; req_exec_cpg_mcast.msglen = msglen; message_source_set (&req_exec_cpg_mcast.source, conn); memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name, sizeof(mar_cpg_name_t)); - req_exec_cpg_iovec[0].iov_base = &req_exec_cpg_mcast; + req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast; req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); - req_exec_cpg_iovec[1].iov_base = &req_lib_cpg_mcast->message; + req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message; req_exec_cpg_iovec[1].iov_len = msglen; // TODO: guarantee type... @@ -1126,7 +1147,7 @@ res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; res_lib_cpg_mcast.header.error = SA_AIS_OK; res_lib_cpg_mcast.flow_control_state = pi->flow_control_state; - openais_conn_send_response(conn, &res_lib_cpg_mcast, + openais_response_send(conn, &res_lib_cpg_mcast, sizeof(res_lib_cpg_mcast)); } @@ -1140,7 +1161,7 @@ res.size = sizeof(res); res.id = MESSAGE_RES_CPG_MEMBERSHIP; res.error = SA_AIS_ERR_ACCESS; /* TODO Better error code */ - openais_conn_send_response(conn, &res, sizeof(res)); + openais_response_send(conn, &res, sizeof(res)); return; } @@ -1154,7 +1175,6 @@ struct res_lib_cpg_trackstart res_lib_cpg_trackstart; struct group_info *gi; struct process_info *otherpi; - void *otherconn; SaAisErrorT error = SA_AIS_OK; log_printf(LOG_LEVEL_DEBUG, "got trackstart request on %p\n", conn); @@ -1166,7 +1186,6 @@ } /* Find the partner connection and add us to it's process_info struct */ - otherconn = openais_conn_partner_get (conn); otherpi = (struct process_info *)openais_conn_private_data_get (conn); otherpi->trackerconn = conn; @@ -1174,7 +1193,7 @@ res_lib_cpg_trackstart.header.size = sizeof(res_lib_cpg_trackstart); res_lib_cpg_trackstart.header.id = MESSAGE_RES_CPG_TRACKSTART; res_lib_cpg_trackstart.header.error = SA_AIS_OK; - openais_conn_send_response(conn, &res_lib_cpg_trackstart, sizeof(res_lib_cpg_trackstart)); + openais_response_send(conn, &res_lib_cpg_trackstart, sizeof(res_lib_cpg_trackstart)); } static void message_handler_req_lib_cpg_trackstop (void *conn, void *message) @@ -1182,7 +1201,6 @@ struct req_lib_cpg_trackstop *req_lib_cpg_trackstop = (struct req_lib_cpg_trackstop *)message; struct res_lib_cpg_trackstop res_lib_cpg_trackstop; struct process_info *otherpi; - void *otherconn; struct group_info *gi; SaAisErrorT error = SA_AIS_OK; @@ -1195,7 +1213,6 @@ } /* Find the partner connection and add us to it's process_info struct */ - otherconn = openais_conn_partner_get (conn); otherpi = (struct process_info *)openais_conn_private_data_get (conn); otherpi->trackerconn = NULL; @@ -1203,7 +1220,7 @@ res_lib_cpg_trackstop.header.size = sizeof(res_lib_cpg_trackstop); res_lib_cpg_trackstop.header.id = MESSAGE_RES_CPG_TRACKSTOP; res_lib_cpg_trackstop.header.error = SA_AIS_OK; - openais_conn_send_response(conn, &res_lib_cpg_trackstop.header, sizeof(res_lib_cpg_trackstop)); + openais_response_send(conn, &res_lib_cpg_trackstop.header, sizeof(res_lib_cpg_trackstop)); } static void message_handler_req_lib_cpg_local_get (void *conn, void *message) @@ -1213,8 +1230,8 @@ res_lib_cpg_local_get.header.size = sizeof(res_lib_cpg_local_get); res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET; res_lib_cpg_local_get.header.error = SA_AIS_OK; - res_lib_cpg_local_get.local_nodeid = totempg_my_nodeid_get(); + res_lib_cpg_local_get.local_nodeid = totempg_my_nodeid_get (); - openais_conn_send_response(conn, &res_lib_cpg_local_get, + openais_response_send(conn, &res_lib_cpg_local_get, sizeof(res_lib_cpg_local_get)); } Index: exec/clm.c =================================================================== --- exec/clm.c (revision 1498) +++ exec/clm.c (revision 1499) @@ -407,7 +407,7 @@ /* * Send notifications to all CLM listeners */ - openais_conn_send_response ( + openais_dispatch_send ( clm_pd->conn, &res_lib_clm_clustertrack, sizeof (struct res_lib_clm_clustertrack)); @@ -667,14 +667,16 @@ list_add (&clm_pd->list, &library_notification_send_listhead); } - openais_conn_send_response (conn, &res_lib_clm_clustertrack, + openais_response_send ( + conn, + &res_lib_clm_clustertrack, sizeof (struct res_lib_clm_clustertrack)); if (req_lib_clm_clustertrack->return_in_callback) { res_lib_clm_clustertrack.header.id = MESSAGE_RES_CLM_TRACKCALLBACK; - openais_conn_send_response ( - openais_conn_partner_get (conn), + openais_dispatch_send ( + conn, &res_lib_clm_clustertrack, sizeof (struct res_lib_clm_clustertrack)); } @@ -699,7 +701,9 @@ list_del (&clm_pd->list); list_init (&clm_pd->list); - openais_conn_send_response (conn, &res_lib_clm_trackstop, + openais_response_send ( + conn, + &res_lib_clm_trackstop, sizeof (struct res_lib_clm_trackstop)); } @@ -735,7 +739,11 @@ if (valid) { memcpy (&res_clm_nodeget.cluster_node, cluster_node, sizeof (mar_clm_cluster_node_t)); } - openais_conn_send_response (conn, &res_clm_nodeget, sizeof (struct res_clm_nodeget)); + + openais_response_send ( + conn, + &res_clm_nodeget, + sizeof (struct res_clm_nodeget)); } static void message_handler_req_lib_clm_nodegetasync (void *conn, void *msg) @@ -770,7 +778,9 @@ res_clm_nodegetasync.header.id = MESSAGE_RES_CLM_NODEGETASYNC; res_clm_nodegetasync.header.error = SA_AIS_OK; - openais_conn_send_response (conn, &res_clm_nodegetasync, + openais_response_send ( + conn, + &res_clm_nodegetasync, sizeof (struct res_clm_nodegetasync)); /* @@ -784,7 +794,8 @@ memcpy (&res_clm_nodegetcallback.cluster_node, cluster_node, sizeof (mar_clm_cluster_node_t)); } - openais_conn_send_response (openais_conn_partner_get (conn), + openais_dispatch_send ( + conn, &res_clm_nodegetcallback, sizeof (struct res_clm_nodegetcallback)); } Index: exec/ipc.c =================================================================== --- exec/ipc.c (revision 1498) +++ exec/ipc.c (revision 1499) @@ -1,5 +1,4 @@ /* - * Copyright (c) 2002-2006 MontaVista Software, Inc. * Copyright (c) 2006-2007 Red Hat, Inc. * * All rights reserved. @@ -54,7 +53,11 @@ #include <signal.h> #include <sched.h> #include <time.h> +#if defined(OPENAIS_SOLARIS) && defined(HAVE_GETPEERUCRED) +#include <ucred.h> +#endif +#include "swab.h" #include "../include/saAis.h" #include "../include/list.h" #include "../include/queue.h" @@ -66,6 +69,8 @@ #include "mainconfig.h" #include "totemconfig.h" #include "main.h" +#include "flow.h" +#include "tlist.h" #include "ipc.h" #include "flow.h" #include "service.h" @@ -79,6 +84,10 @@ #include "util.h" +#ifdef OPENAIS_SOLARIS +#define MSG_NOSIGNAL 0 +#endif + #define SERVER_BACKLOG 5 /* @@ -96,8 +105,10 @@ static totempg_groups_handle ipc_handle; -DECLARE_LIST_INIT (conn_info_list_head); +static pthread_mutex_t conn_io_list_mutex = PTHREAD_MUTEX_INITIALIZER; +DECLARE_LIST_INIT (conn_io_list_head); + static void (*ipc_serialize_lock_fn) (void); static void (*ipc_serialize_unlock_fn) (void); @@ -107,148 +118,282 @@ size_t mlen; }; -enum conn_state { - CONN_STATE_ACTIVE, - CONN_STATE_SECURITY, - CONN_STATE_REQUESTED, - CONN_STATE_CLOSED, - CONN_STATE_DISCONNECTED +enum conn_io_state { + CONN_IO_STATE_INITIALIZING, + CONN_IO_STATE_AUTHENTICATED, + CONN_IO_STATE_INIT_FAILED }; +enum conn_info_state { + CONN_INFO_STATE_INITIALIZING, + CONN_INFO_STATE_ACTIVE, + CONN_INFO_STATE_DISCONNECT_REQUESTED, + CONN_INFO_STATE_DISCONNECTED +}; + +struct conn_info; + +struct conn_io { + int fd; /* File descriptor */ + unsigned int events; /* events polled for by file descriptor */ + pthread_t thread; /* thread identifier */ + pthread_attr_t thread_attr; /* thread attribute */ + char *inb; /* Input buffer for non-blocking reads */ + int inb_nextheader; /* Next message header starts here */ + int inb_start; /* Start location of input buffer */ + int inb_inuse; /* Bytes currently stored in input buffer */ + struct queue outq; /* Circular queue for outgoing requests */ + int byte_start; /* Byte to start sending from in head of queue */ + unsigned int fcc; /* flow control local count */ + enum conn_io_state state; /* state of this conn_io connection */ + struct conn_info *conn_info; /* connection information combining multiple conn_io structs */ + unsigned int refcnt; /* reference count for conn_io data structure */ + pthread_mutex_t mutex; + unsigned int service; + struct list_head list; +}; + + struct conn_info { - int fd; /* File descriptor */ - unsigned int events; /* events polled for by file descriptor */ - enum conn_state state; /* State of this connection */ - pthread_t thread; /* thread identifier */ - pthread_attr_t thread_attr; /* thread attribute */ - char *inb; /* Input buffer for non-blocking reads */ - int inb_nextheader; /* Next message header starts here */ - int inb_start; /* Start location of input buffer */ - int inb_inuse; /* Bytes currently stored in input buffer */ - struct queue outq; /* Circular queue for outgoing requests */ - int byte_start; /* Byte to start sending from in head of queue */ - enum service_types service;/* Type of service so dispatch knows how to route message */ - int authenticated; /* Is this connection authenticated? */ - void *private_data; /* library connection private data */ - struct conn_info *conn_info_partner; /* partner connection dispatch<->response */ + enum conn_info_state state; /* State of this connection */ + enum service_types service; /* Type of service so dispatch knows how to route message */ + void *private_data; /* library connection private data */ unsigned int flow_control_handle; /* flow control identifier */ unsigned int flow_control_enabled; /* flow control enabled bit */ - unsigned int flow_control_local_count; /* flow control local count */ enum openais_flow_control flow_control; /* Does this service use IPC flow control */ pthread_mutex_t flow_control_mutex; + unsigned int flow_control_local_count; /* flow control local count */ int (*lib_exit_fn) (void *conn); - struct timerlist timerlist; pthread_mutex_t mutex; - pthread_mutex_t *shared_mutex; - struct list_head list; + struct conn_io *conn_io_response; + struct conn_io *conn_io_dispatch; + unsigned int refcnt; }; -static void *prioritized_poll_thread (void *conn); -static int conn_info_outq_flush (struct conn_info *conn_info); -static void libais_deliver (struct conn_info *conn_info); -static void ipc_flow_control (struct conn_info *conn_info); +static void *prioritized_poll_thread (void *conn_io_in); +static int conn_io_outq_flush (struct conn_io *conn_io); +static void conn_io_deliver (struct conn_io *conn_io); +//static void ipc_flow_control (struct conn_info *conn_info); +static inline void conn_info_destroy (struct conn_info *conn_info); +static void conn_io_destroy (struct conn_io *conn_io); +static int conn_io_send (struct conn_io *conn_io, void *msg, int mlen); +static inline struct conn_info *conn_info_create (void); +static int conn_io_found (struct conn_io *conn_io_to_match); +static int response_init_send (struct conn_io *conn_io, void *message); +static int dispatch_init_send (struct conn_io *conn_io, void *message); + /* * IPC Initializers */ -static int response_init_send_response ( - struct conn_info *conn_info, - void *message); -static int dispatch_init_send_response ( - struct conn_info *conn_info, - void *message); +static int conn_io_refcnt_value (struct conn_io *conn_io) +{ + unsigned int refcnt; -static int (*ais_init_service[]) (struct conn_info *conn_info, void *message) = { - response_init_send_response, - dispatch_init_send_response + pthread_mutex_lock (&conn_io->mutex); + refcnt = conn_io->refcnt; + pthread_mutex_unlock (&conn_io->mutex); + + return (refcnt); +} + +static void conn_io_refcnt_inc (struct conn_io *conn_io) +{ + pthread_mutex_lock (&conn_io->mutex); + conn_io->refcnt += 1; + pthread_mutex_unlock (&conn_io->mutex); +} + +static int conn_io_refcnt_dec (struct conn_io *conn_io) +{ + unsigned int refcnt; + + pthread_mutex_lock (&conn_io->mutex); + conn_io->refcnt -= 1; + refcnt = conn_io->refcnt; + pthread_mutex_unlock (&conn_io->mutex); + + return (refcnt); +} + +static void conn_info_refcnt_inc (struct conn_info *conn_info) +{ + /* + * Connection not fully initialized yet + */ + if (conn_info == NULL) { + return; + } + pthread_mutex_lock (&conn_info->mutex); + conn_info->refcnt += 1; + pthread_mutex_unlock (&conn_info->mutex); +} + +static void conn_info_refcnt_dec (struct conn_info *conn_info) +{ + int refcnt; + + /* + * Connection not fully initialized yet + */ + if (conn_info == NULL) { + return; + } + pthread_mutex_lock (&conn_info->mutex); + conn_info->refcnt -= 1; + refcnt = conn_info->refcnt; + assert (refcnt >= 0); + pthread_mutex_unlock (&conn_info->mutex); + + if (refcnt == 0) { + conn_info_destroy (conn_info); + } +} + +void openais_conn_info_refcnt_dec (void *conn) +{ + struct conn_info *conn_info = (struct conn_info *)conn; + + conn_info_refcnt_dec (conn_info); +} + +void openais_conn_info_refcnt_inc (void *conn) +{ + struct conn_info *conn_info = (struct conn_info *)conn; + + conn_info_refcnt_inc (conn_info); +} + +static int (*ais_init_service[]) (struct conn_io *conn_io, void *message) = { + response_init_send, + dispatch_init_send }; -static void libais_disconnect_security (struct conn_info *conn_info) +static void disconnect_request (struct conn_info *conn_info) { - conn_info->state = CONN_STATE_SECURITY; - close (conn_info->fd); +unsigned int res; + /* + * connection not fully active yet + */ + if (conn_info == NULL) { + return; + } + /* + * We only want to decrement the reference count on these two + * conn_io contexts one time + */ + if (conn_info->state != CONN_INFO_STATE_ACTIVE) { + return; + } + res = conn_io_refcnt_dec (conn_info->conn_io_response); + res = conn_io_refcnt_dec (conn_info->conn_io_dispatch); + conn_info->state = CONN_INFO_STATE_DISCONNECT_REQUESTED; } -static int response_init_send_response ( - struct conn_info *conn_info, +static int response_init_send ( + struct conn_io *conn_io, void *message) { SaAisErrorT error = SA_AIS_ERR_ACCESS; - uintptr_t cinfo = (uintptr_t)conn_info; + uintptr_t cinfo = (uintptr_t)conn_io; mar_req_lib_response_init_t *req_lib_response_init = (mar_req_lib_response_init_t *)message; mar_res_lib_response_init_t res_lib_response_init; - if (conn_info->authenticated) { - conn_info->service = req_lib_response_init->resdis_header.service; + if (conn_io->state == CONN_IO_STATE_AUTHENTICATED) { error = SA_AIS_OK; + conn_io->service = req_lib_response_init->resdis_header.service; } res_lib_response_init.header.size = sizeof (mar_res_lib_response_init_t); res_lib_response_init.header.id = MESSAGE_RES_INIT; res_lib_response_init.header.error = error; res_lib_response_init.conn_info = (mar_uint64_t)cinfo; - openais_conn_send_response ( - conn_info, + conn_io_send ( + conn_io, &res_lib_response_init, sizeof (res_lib_response_init)); if (error == SA_AIS_ERR_ACCESS) { - libais_disconnect_security (conn_info); + conn_io_destroy (conn_io); return (-1); } + return (0); } -static int dispatch_init_send_response ( - struct conn_info *conn_info, +/* + * This is called iwth ipc_serialize_lock_fn() called + * Therefore there are no races with the destruction of the conn_io + * data structure + */ +static int dispatch_init_send ( + struct conn_io *conn_io, void *message) { SaAisErrorT error = SA_AIS_ERR_ACCESS; uintptr_t cinfo; mar_req_lib_dispatch_init_t *req_lib_dispatch_init = (mar_req_lib_dispatch_init_t *)message; mar_res_lib_dispatch_init_t res_lib_dispatch_init; - struct conn_info *msg_conn_info; + struct conn_io *msg_conn_io; + struct conn_info *conn_info; + unsigned int service; - if (conn_info->authenticated) { - conn_info->service = req_lib_dispatch_init->resdis_header.service; - if (!ais_service[req_lib_dispatch_init->resdis_header.service]) + service = req_lib_dispatch_init->resdis_header.service; + cinfo = (uintptr_t)req_lib_dispatch_init->conn_info; + msg_conn_io = (struct conn_io *)cinfo; + + /* + * The response IPC connection has disconnected already for + * some reason and is no longer referenceable in the system + */ + if (conn_io->state == CONN_IO_STATE_AUTHENTICATED) { + /* + * If the response conn_io isn't found, it disconnected. + * Hence, a full connection cannot be made and this connection + * should be aborted by the poll thread + */ + if (conn_io_found (msg_conn_io) == 0) { + error = SA_AIS_ERR_TRY_AGAIN; + conn_io->state = CONN_IO_STATE_INIT_FAILED; + } else + /* + * If no service is found for the requested library service, + * the proper service handler isn't loaded and this connection + * should be aborted by the poll thread + */ + if (ais_service[service] == NULL) { error = SA_AIS_ERR_NOT_SUPPORTED; - else + conn_io->state = CONN_IO_STATE_INIT_FAILED; + } else { error = SA_AIS_OK; + } - cinfo = (uintptr_t)req_lib_dispatch_init->conn_info; - conn_info->conn_info_partner = (struct conn_info *)cinfo; - - /* temporary fix for memory leak + /* + * The response and dispatch conn_io structures are available. + * Attempt to allocate the appropriate memory for the private + * data area */ - pthread_mutex_destroy (conn_info->conn_info_partner->shared_mutex); - free (conn_info->conn_info_partner->shared_mutex); - - conn_info->conn_info_partner->shared_mutex = conn_info->shared_mutex; - - list_add (&conn_info_list_head, &conn_info->list); - list_add (&conn_info_list_head, &conn_info->conn_info_partner->list); - - msg_conn_info = (struct conn_info *)cinfo; - msg_conn_info->conn_info_partner = conn_info; - if (error == SA_AIS_OK) { int private_data_size; - private_data_size = ais_service[req_lib_dispatch_init->resdis_header.service]->private_data_size; + conn_info = conn_info_create (); + private_data_size = ais_service[service]->private_data_size; if (private_data_size) { conn_info->private_data = malloc (private_data_size); - conn_info->conn_info_partner->private_data = conn_info->private_data; + /* + * No private data could be allocated so + * request the poll thread to abort + */ if (conn_info->private_data == NULL) { + conn_io->state = CONN_IO_STATE_INIT_FAILED; error = SA_AIS_ERR_NO_MEMORY; } else { memset (conn_info->private_data, 0, private_data_size); } } else { conn_info->private_data = NULL; - conn_info->conn_info_partner->private_data = NULL; } } } @@ -257,338 +402,340 @@ res_lib_dispatch_init.header.id = MESSAGE_RES_INIT; res_lib_dispatch_init.header.error = error; - openais_conn_send_response ( - conn_info, - &res_lib_dispatch_init, - sizeof (res_lib_dispatch_init)); + if (error != SA_AIS_OK) { + conn_io_send ( + conn_io, + &res_lib_dispatch_init, + sizeof (res_lib_dispatch_init)); - if (error == SA_AIS_ERR_ACCESS) { - libais_disconnect_security (conn_info); return (-1); } - if (error != SA_AIS_OK) { - return (-1); - } - conn_info->state = CONN_STATE_ACTIVE; - conn_info->conn_info_partner->state = CONN_STATE_ACTIVE; - conn_info->lib_exit_fn = ais_service[conn_info->service]->lib_exit_fn; + /* + * connect both dispatch and response conn_ios into the conn_info + * data structure + */ + conn_info->state = CONN_INFO_STATE_ACTIVE; + conn_info->lib_exit_fn = ais_service[service]->lib_exit_fn; + conn_info->conn_io_response = msg_conn_io; + conn_info->conn_io_response->conn_info = conn_info; + conn_info->conn_io_dispatch = conn_io; + conn_info->service = service; + conn_io->service = service; + conn_io->conn_info = conn_info; ais_service[conn_info->service]->lib_init_fn (conn_info); conn_info->flow_control = ais_service[conn_info->service]->flow_control; - conn_info->conn_info_partner->flow_control = ais_service[conn_info->service]->flow_control; if (ais_service[conn_info->service]->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) { openais_flow_control_ipc_init ( &conn_info->flow_control_handle, conn_info->service); } + + /* + * Tell the library the IPC connections are configured + */ + conn_io_send ( + conn_io, + &res_lib_dispatch_init, + sizeof (res_lib_dispatch_init)); return (0); } /* * Create a connection data structure */ -static inline unsigned int conn_info_create (int fd) { +static inline struct conn_info *conn_info_create (void) +{ struct conn_info *conn_info; - int res; conn_info = malloc (sizeof (struct conn_info)); if (conn_info == 0) { - return (ENOMEM); + return (NULL); } memset (conn_info, 0, sizeof (struct conn_info)); - res = queue_init (&conn_info->outq, SIZEQUEUE, + conn_info->refcnt = 2; + pthread_mutex_init (&conn_info->mutex, NULL); + conn_info->state = CONN_INFO_STATE_INITIALIZING; + + return (conn_info); +} + +static inline void conn_info_destroy (struct conn_info *conn_info) +{ + if (conn_info->private_data) { + free (conn_info->private_data); + } + pthread_mutex_destroy (&conn_info->mutex); + free (conn_info); +} + +static int conn_io_create (int fd) +{ + int res; + struct conn_io *conn_io; + + conn_io = malloc (sizeof (struct conn_io)); + if (conn_io == NULL) { + return (-1); + } + memset (conn_io, 0, sizeof (struct conn_io)); + + res = queue_init (&conn_io->outq, SIZEQUEUE, sizeof (struct outq_item)); if (res != 0) { - free (conn_info); - return (ENOMEM); + return (-1); } - conn_info->inb = malloc (sizeof (char) * SIZEINB); - if (conn_info->inb == NULL) { - queue_free (&conn_info->outq); - free (conn_info); - return (ENOMEM); + + conn_io->inb = malloc (sizeof (char) * SIZEINB); + if (conn_io->inb == NULL) { + queue_free (&conn_io->outq); + return (-1); } - conn_info->shared_mutex = malloc (sizeof (pthread_mutex_t)); - if (conn_info->shared_mutex == NULL) { - free (conn_info->inb); - queue_free (&conn_info->outq); - free (conn_info); - return (ENOMEM); - } - pthread_mutex_init (&conn_info->mutex, NULL); - pthread_mutex_init (&conn_info->flow_control_mutex, NULL); - pthread_mutex_init (conn_info->shared_mutex, NULL); + conn_io->fd = fd; + conn_io->events = POLLIN|POLLNVAL; + conn_io->refcnt = 1; + conn_io->service = SOCKET_SERVICE_INIT; + conn_io->state = CONN_IO_STATE_INITIALIZING; - list_init (&conn_info->list); - conn_info->state = CONN_STATE_ACTIVE; - conn_info->fd = fd; - conn_info->events = POLLIN|POLLNVAL; - conn_info->service = SOCKET_SERVICE_INIT; + pthread_attr_init (&conn_io->thread_attr); - pthread_attr_init (&conn_info->thread_attr); -/* - * IA64 needs more stack space then other arches - */ + pthread_mutex_init (&conn_io->mutex, NULL); + + /* + * IA64 needs more stack space then other arches + */ #if defined(__ia64__) - pthread_attr_setstacksize (&conn_info->thread_attr, 400000); + pthread_attr_setstacksize (&conn_io->thread_attr, 400000); #else - pthread_attr_setstacksize (&conn_info->thread_attr, 200000); + pthread_attr_setstacksize (&conn_io->thread_attr, 200000); #endif - pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_DETACHED); - res = pthread_create (&conn_info->thread, &conn_info->thread_attr, - prioritized_poll_thread, conn_info); + pthread_attr_setdetachstate (&conn_io->thread_attr, PTHREAD_CREATE_DETACHED); + + res = pthread_create (&conn_io->thread, &conn_io->thread_attr, + prioritized_poll_thread, conn_io); + + list_init (&conn_io->list); + + pthread_mutex_lock (&conn_io_list_mutex); + list_add (&conn_io->list, &conn_io_list_head); + pthread_mutex_unlock (&conn_io_list_mutex); return (res); } -static void conn_info_destroy (struct conn_info *conn_info) +static void conn_io_destroy (struct conn_io *conn_io) { struct outq_item *outq_item; /* * Free the outq queued items */ - while (!queue_is_empty (&conn_info->outq)) { - outq_item = queue_item_get (&conn_info->outq); + while (!queue_is_empty (&conn_io->outq)) { + outq_item = queue_item_get (&conn_io->outq); free (outq_item->msg); - queue_item_remove (&conn_info->outq); + queue_item_remove (&conn_io->outq); } - queue_free (&conn_info->outq); - free (conn_info->inb); - if (conn_info->conn_info_partner) { - conn_info->conn_info_partner->conn_info_partner = NULL; - } + queue_free (&conn_io->outq); + free (conn_io->inb); + close (conn_io->fd); + pthread_mutex_lock (&conn_io_list_mutex); + list_del (&conn_io->list); + pthread_mutex_unlock (&conn_io_list_mutex); - pthread_attr_destroy (&conn_info->thread_attr); - pthread_mutex_destroy (&conn_info->mutex); - pthread_mutex_destroy (&conn_info->flow_control_mutex); - - list_del (&conn_info->list); - free (conn_info); + pthread_attr_destroy (&conn_io->thread_attr); + pthread_mutex_destroy (&conn_io->mutex); + free (conn_io); } -static int libais_connection_active (struct conn_info *conn_info) +static int conn_io_found (struct conn_io *conn_io_to_match) { - return (conn_info->state == CONN_STATE_ACTIVE); -} + struct list_head *list; + struct conn_io *conn_io; -static void libais_disconnect_request (struct conn_info *conn_info) -{ - if (conn_info->state == CONN_STATE_ACTIVE) { - conn_info->state = CONN_STATE_REQUESTED; - conn_info->conn_info_partner->state = CONN_STATE_REQUESTED; + for (list = conn_io_list_head.next; list != &conn_io_list_head; + list = list->next) { + + conn_io = list_entry (list, struct conn_io, list); + if (conn_io == conn_io_to_match) { + return (1); + } } -} -static int libais_disconnect (struct conn_info *conn_info) -{ - int res = 0; - - assert (conn_info->state != CONN_STATE_ACTIVE); - - if (conn_info->state == CONN_STATE_DISCONNECTED) { - assert (0); - } - - /* - * Close active connections - */ - if (conn_info->state == CONN_STATE_ACTIVE || conn_info->state == CONN_STATE_REQUESTED) { - close (conn_info->fd); - conn_info->state = CONN_STATE_CLOSED; - close (conn_info->conn_info_partner->fd); - conn_info->conn_info_partner->state = CONN_STATE_CLOSED; - } - - /* - * Note we will only call the close operation once on the first time - * one of the connections is closed - */ - if (conn_info->state == CONN_STATE_CLOSED) { - if (conn_info->lib_exit_fn) { - res = conn_info->lib_exit_fn (conn_info); - } - if (res == -1) { - return (-1); - } - if (conn_info->conn_info_partner->lib_exit_fn) { - res = conn_info->conn_info_partner->lib_exit_fn (conn_info); - } - if (res == -1) { - return (-1); - } - } - conn_info->state = CONN_STATE_DISCONNECTED; - conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTED; - if (conn_info->flow_control_enabled == 1) { - openais_flow_control_disable (conn_info->flow_control_handle); - } return (0); } -static inline void conn_info_mutex_lock ( - struct conn_info *conn_info, - unsigned int service) -{ - if (service == SOCKET_SERVICE_INIT) { - pthread_mutex_lock (&conn_info->mutex); - } else { - pthread_mutex_lock (conn_info->shared_mutex); - } -} -static inline void conn_info_mutex_unlock ( - struct conn_info *conn_info, - unsigned int service) -{ - if (service == SOCKET_SERVICE_INIT) { - pthread_mutex_unlock (&conn_info->mutex); - } else { - pthread_mutex_unlock (conn_info->shared_mutex); - } -} - /* * This thread runs in a specific thread priority mode to handle - * I/O requests from the library + * I/O requests from or to the library */ -static void *prioritized_poll_thread (void *conn) +static void *prioritized_poll_thread (void *conn_io_in) { - struct conn_info *conn_info = (struct conn_info *)conn; + struct conn_io *conn_io = (struct conn_io *)conn_io_in; + struct conn_info *conn_info = NULL; struct pollfd ufd; int fds; struct sched_param sched_param; int res; - pthread_mutex_t *rel_mutex; - unsigned int service; - struct conn_info *cinfo_partner; - void *private_data; -#if defined(OPENAIS_BSD) || defined(OPENAIS_LINUX) - res = sched_get_priority_max (SCHED_RR); - if (res != -1) { - sched_param.sched_priority = res; - res = pthread_setschedparam (conn_info->thread, SCHED_RR, &sched_param); - if (res == -1) { - log_printf (LOG_LEVEL_WARNING, "Could not set SCHED_RR at priority %d: %s\n", - sched_param.sched_priority, strerror (errno)); - } - } else - log_printf (LOG_LEVEL_WARNING, "Could not get maximum scheduler priority: %s\n", strerror (errno)); -#else - log_printf(LOG_LEVEL_WARNING, "Scheduler priority left to default value (no OS support)\n"); -#endif + sched_param.sched_priority = 1; +// res = pthread_setschedparam (conn_io->thread, SCHED_RR, &sched_param); - ufd.fd = conn_info->fd; + ufd.fd = conn_io->fd; for (;;) { retry_poll: - service = conn_info->service; - ufd.events = conn_info->events; + conn_info = conn_io->conn_info; + conn_io_refcnt_inc (conn_io); + conn_info_refcnt_inc (conn_info); + + ufd.events = conn_io->events; ufd.revents = 0; fds = poll (&ufd, 1, -1); - - conn_info_mutex_lock (conn_info, service); - - switch (conn_info->state) { - case CONN_STATE_SECURITY: - conn_info_mutex_unlock (conn_info, service); - pthread_mutex_destroy (conn_info->shared_mutex); - free (conn_info->shared_mutex); - conn_info_destroy (conn); - pthread_exit (0); - break; - - case CONN_STATE_REQUESTED: - case CONN_STATE_CLOSED: - res = libais_disconnect (conn); - if (res != 0) { - conn_info_mutex_unlock (conn_info, service); - goto retry_poll; - } - break; - - case CONN_STATE_DISCONNECTED: - rel_mutex = conn_info->shared_mutex; - private_data = conn_info->private_data; - cinfo_partner = conn_info->conn_info_partner; - conn_info_destroy (conn); - if (service == SOCKET_SERVICE_INIT) { - pthread_mutex_unlock (&conn_info->mutex); - } else { - pthread_mutex_unlock (rel_mutex); - } - if (cinfo_partner == NULL) { - pthread_mutex_destroy (rel_mutex); - free (rel_mutex); - free (private_data); - } - pthread_exit (0); - /* - * !! NOTE !! this is the exit point for this thread - */ - break; - - default: - break; - } - if (fds == -1) { - conn_info_mutex_unlock (conn_info, service); + conn_io_refcnt_dec (conn_io); + conn_info_refcnt_dec (conn_info); goto retry_poll; } ipc_serialize_lock_fn (); if (fds == 1 && ufd.revents) { - if (ufd.revents & (POLLERR|POLLHUP)) { + if ((ufd.revents & (POLLERR|POLLHUP)) || + (conn_info && + conn_info->state == CONN_INFO_STATE_DISCONNECT_REQUESTED)) { + disconnect_request (conn_info); - libais_disconnect_request (conn_info); - - conn_info_mutex_unlock (conn_info, service); + conn_io_refcnt_dec (conn_io); + conn_info_refcnt_dec (conn_info); ipc_serialize_unlock_fn (); - continue; + break; /* from for */ } if (ufd.revents & POLLOUT) { - conn_info_outq_flush (conn_info); + conn_io_outq_flush (conn_io); } if ((ufd.revents & POLLIN) == POLLIN) { - libais_deliver (conn_info); + conn_io_deliver (conn_io); } - ipc_flow_control (conn_info); + /* + * IPC initializiation failed because response fd + * disconnected before it was linked to dispatch fd + */ + if (conn_io->state == CONN_IO_STATE_INIT_FAILED) { + conn_io_destroy (conn_io); + conn_info_refcnt_dec (conn_info); + ipc_serialize_unlock_fn (); + pthread_exit (0); + } + /* + * IPC initializiation failed because response fd + * disconnected before it was linked to dispatch fd + */ + if (conn_io->state == CONN_IO_STATE_INIT_FAILED) { + break; + } +// ipc_flow_control (conn_info); + } ipc_serialize_unlock_fn (); - conn_info_mutex_unlock (conn_info, service); + + conn_io_refcnt_dec (conn_io); + conn_info_refcnt_dec (conn_info); } + ipc_serialize_lock_fn (); + /* + * IPC initializiation failed because response fd + * disconnected before it was linked to dispatch fd + */ + if (conn_io->conn_info == NULL || conn_io->state == CONN_IO_STATE_INIT_FAILED) { + conn_io_destroy (conn_io); + conn_info_refcnt_dec (conn_info); + ipc_serialize_unlock_fn (); + pthread_exit (0); + } + + conn_info = conn_io->conn_info; + + /* + * This is the response conn_io + */ + if (conn_info->conn_io_response == conn_io) { + for (;;) { + if (conn_io_refcnt_value (conn_io) == 0) { + conn_io->conn_info = NULL; + conn_io_destroy (conn_io); + conn_info_refcnt_dec (conn_info); + ipc_serialize_unlock_fn (); + pthread_exit (0); + } + usleep (1000); + printf ("sleep 1\n"); + } + } /* response conn_io */ + + /* + * This is the dispatch conn_io + */ + if (conn_io->conn_info->conn_io_dispatch == conn_io) { + for (;;) { + if (conn_io_refcnt_value (conn_io) == 0) { + res = 0; // TODO + /* + * Execute the library exit function + */ + if (conn_io->conn_info->lib_exit_fn) { + res = conn_io->conn_info->lib_exit_fn (conn_info); + } + if (res == 0) { + if (conn_io->conn_info->flow_control_enabled == 1) { +// openais_flow_control_disable ( +// conn_info->flow_control_handle); + } + conn_io->conn_info = NULL; + conn_io_destroy (conn_io); + conn_info_refcnt_dec (conn_info); + ipc_serialize_unlock_fn (); + pthread_exit (0); + } + } /* refcnt == 0 */ + usleep (1000); + printf ("sleep 2\n"); + } /* for (;;) */ + } /* dispatch conn_io */ + + /* * This code never reached */ return (0); } -#if defined(OPENAIS_LINUX) +#if defined(OPENAIS_LINUX) || defined(OPENAIS_SOLARIS) /* SUN_LEN is broken for abstract namespace */ #define AIS_SUN_LEN(a) sizeof(*(a)) +#else +#define AIS_SUN_LEN(a) SUN_LEN(a) +#endif +#if defined(OPENAIS_LINUX) char *socketname = "libais.socket"; #else -#define AIS_SUN_LEN(a) SUN_LEN(a) - char *socketname = "/var/run/libais.socket"; #endif +#ifdef COMPILOE_OUT static void ipc_flow_control (struct conn_info *conn_info) { unsigned int entries_used; @@ -646,8 +793,9 @@ } } } +#endif -static int conn_info_outq_flush (struct conn_info *conn_info) { +static int conn_io_outq_flush (struct conn_io *conn_io) { struct queue *outq; int res = 0; struct outq_item *queue_item; @@ -655,46 +803,51 @@ struct iovec iov_send; char *msg_addr; - if (!libais_connection_active (conn_info)) { - return (-1); - } - outq = &conn_info->outq; + outq = &conn_io->outq; msg_send.msg_iov = &iov_send; msg_send.msg_name = 0; msg_send.msg_namelen = 0; msg_send.msg_iovlen = 1; +#ifndef OPENAIS_SOLARIS msg_send.msg_control = 0; msg_send.msg_controllen = 0; msg_send.msg_flags = 0; +#else + msg_send.msg_accrights = 0; + msg_send.msg_accrightslen = 0; +#endif + pthread_mutex_lock (&conn_io->mutex); while (!queue_is_empty (outq)) { queue_item = queue_item_get (outq); msg_addr = (char *)queue_item->msg; - msg_addr = &msg_addr[conn_info->byte_start]; + msg_addr = &msg_addr[conn_io->byte_start]; iov_send.iov_base = msg_addr; - iov_send.iov_len = queue_item->mlen - conn_info->byte_start; + iov_send.iov_len = queue_item->mlen - conn_io->byte_start; retry_sendmsg: - res = sendmsg (conn_info->fd, &msg_send, MSG_NOSIGNAL); + res = sendmsg (conn_io->fd, &msg_send, MSG_NOSIGNAL); if (res == -1 && errno == EINTR) { goto retry_sendmsg; } if (res == -1 && errno == EAGAIN) { + pthread_mutex_unlock (&conn_io->mutex); return (0); } if (res == -1 && errno == EPIPE) { - libais_disconnect_request (conn_info); + disconnect_request (conn_io->conn_info); + pthread_mutex_unlock (&conn_io->mutex); return (0); } if (res == -1) { - printf ("ERRNO is %d\n", errno); assert (0); /* some other unhandled error here */ } - if (res + conn_info->byte_start != queue_item->mlen) { - conn_info->byte_start += res; + if (res + conn_io->byte_start != queue_item->mlen) { + conn_io->byte_start += res; + pthread_mutex_unlock (&conn_io->mutex); return (0); } @@ -702,14 +855,15 @@ * Message sent, try sending another message */ queue_item_remove (outq); - conn_info->byte_start = 0; + conn_io->byte_start = 0; free (queue_item->msg); } /* while queue not empty */ if (queue_is_empty (outq)) { - conn_info->events = POLLIN|POLLNVAL; + conn_io->events = POLLIN|POLLNVAL; } + pthread_mutex_unlock (&conn_io->mutex); return (0); } @@ -720,7 +874,7 @@ char buf[4096]; }; -static void libais_deliver (struct conn_info *conn_info) +static void conn_io_deliver (struct conn_io *conn_io) { int res; mar_req_header_t *header; @@ -732,9 +886,6 @@ char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))]; struct ucred *cred; int on = 0; -#else - uid_t euid; - gid_t egid; #endif int send_ok = 0; int send_ok_joined = 0; @@ -745,9 +896,10 @@ msg_recv.msg_iovlen = 1; msg_recv.msg_name = 0; msg_recv.msg_namelen = 0; +#ifndef OPENAIS_SOLARIS msg_recv.msg_flags = 0; - if (conn_info->authenticated) { + if (conn_io->state == CONN_IO_STATE_AUTHENTICATED) { msg_recv.msg_control = 0; msg_recv.msg_controllen = 0; } else { @@ -756,24 +908,54 @@ msg_recv.msg_controllen = sizeof (cmsg_cred); #else euid = -1; egid = -1; - if (getpeereid(conn_info->fd, &euid, &egid) != -1 && + if (getpeereid(conn_io->fd, &euid, &egid) != -1 && (euid == 0 || egid == g_gid_valid)) { - conn_info->authenticated = 1; + conn_io->state = CONN_IO_STATE_AUTHENTICATED; } - if (conn_info->authenticated == 0) { + if (conn_io->state == CONN_IO_STATE_INITIALIZING) { log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", egid, g_gid_valid); } #endif } - iov_recv.iov_base = &conn_info->inb[conn_info->inb_start]; - iov_recv.iov_len = (SIZEINB) - conn_info->inb_start; - if (conn_info->inb_inuse == SIZEINB) { + #else /* OPENAIS_SOLARIS */ + msg_recv.msg_accrights = 0; + msg_recv.msg_accrightslen = 0; + + + if (! conn_info->authenticated) { + #ifdef HAVE_GETPEERUCRED + ucred_t *uc; + uid_t euid = -1; + gid_t egid = -1; + if (getpeerucred(conn_info->fd, &uc) == 0) { + euid = ucred_geteuid(uc); + egid = ucred_getegid(uc); + if ((euid == 0) || (egid == g_gid_valid)) { + conn_info->authenticated = 1; + } + ucred_free(uc); + } + if (conn_info->authenticated == 0) { + log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", (int)egid, g_gid_valid); + } + #else + log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated " + "because platform does not support " + "authentication with sockets, continuing " + "with a fake authentication\n"); + conn_info->authenticated = 1; + #endif + } + #endif + iov_recv.iov_base = &conn_io->inb[conn_io->inb_start]; + iov_recv.iov_len = (SIZEINB) - conn_io->inb_start; + if (conn_io->inb_inuse == SIZEINB) { return; } retry_recv: - res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL); + res = recvmsg (conn_io->fd, &msg_recv, MSG_NOSIGNAL); if (res == -1 && errno == EINTR) { goto retry_recv; } else @@ -794,17 +976,17 @@ * Authenticate if this connection has not been authenticated */ #ifdef OPENAIS_LINUX - if (conn_info->authenticated == 0) { + if (conn_io->state == CONN_IO_STATE_INITIALIZING) { cmsg = CMSG_FIRSTHDR (&msg_recv); assert (cmsg); cred = (struct ucred *)CMSG_DATA (cmsg); if (cred) { if (cred->uid == 0 || cred->gid == g_gid_valid) { - setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on)); - conn_info->authenticated = 1; + setsockopt(conn_io->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on)); + conn_io->state = CONN_IO_STATE_AUTHENTICATED; } } - if (conn_info->authenticated == 0) { + if (conn_io->state == CONN_IO_STATE_INITIALIZING) { log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", cred->gid, g_gid_valid); } } @@ -813,23 +995,23 @@ * Dispatch all messages received in recvmsg that can be dispatched * sizeof (mar_req_header_t) needed at minimum to do any processing */ - conn_info->inb_inuse += res; - conn_info->inb_start += res; + conn_io->inb_inuse += res; + conn_io->inb_start += res; - while (conn_info->inb_inuse >= sizeof (mar_req_header_t) && res != -1) { - header = (mar_req_header_t *)&conn_info->inb[conn_info->inb_start - conn_info->inb_inuse]; + while (conn_io->inb_inuse >= sizeof (mar_req_header_t) && res != -1) { + header = (mar_req_header_t *)&conn_io->inb[conn_io->inb_start - conn_io->inb_inuse]; - if (header->size > conn_info->inb_inuse) { + if (header->size > conn_io->inb_inuse) { break; } - service = conn_info->service; + service = conn_io->service; /* * If this service is in init phase, initialize service * else handle message using service service */ - if (service == SOCKET_SERVICE_INIT) { - res = ais_init_service[header->id] (conn_info, header); + if (conn_io->service == SOCKET_SERVICE_INIT) { + res = ais_init_service[header->id] (conn_io, header); } else { /* * Not an init service, but a standard service @@ -846,7 +1028,7 @@ * to queue a message, otherwise tell the library we are busy and to * try again later */ - send_ok_joined_iovec.iov_base = header; + send_ok_joined_iovec.iov_base = (char *)header; send_ok_joined_iovec.iov_len = header->size; send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle, &send_ok_joined_iovec, 1); @@ -859,7 +1041,7 @@ (sync_in_process() == 0))); if (send_ok) { - ais_service[service]->lib_service[header->id].lib_handler_fn(conn_info, header); + ais_service[service]->lib_service[header->id].lib_handler_fn(conn_io->conn_info, header); } else { /* @@ -870,33 +1052,33 @@ res_overlay.header.id = ais_service[service]->lib_service[header->id].response_id; res_overlay.header.error = SA_AIS_ERR_TRY_AGAIN; - openais_conn_send_response ( - conn_info, + conn_io_send ( + conn_io, &res_overlay, res_overlay.header.size); } } - conn_info->inb_inuse -= header->size; + conn_io->inb_inuse -= header->size; } /* while */ - if (conn_info->inb_inuse == 0) { - conn_info->inb_start = 0; + if (conn_io->inb_inuse == 0) { + conn_io->inb_start = 0; } else -// BUG if (connections[conn_info->fd].inb_start + connections[conn_info->fd].inb_inuse >= SIZEINB) { - if (conn_info->inb_start >= SIZEINB) { +// BUG if (connections[conn_io->fd].inb_start + connections[conn_io->fd].inb_inuse >= SIZEINB) { + if (conn_io->inb_start >= SIZEINB) { /* * If in buffer is full, move it back to start */ - memmove (conn_info->inb, - &conn_info->inb[conn_info->inb_start - conn_info->inb_inuse], - sizeof (char) * conn_info->inb_inuse); - conn_info->inb_start = conn_info->inb_inuse; + memmove (conn_io->inb, + &conn_io->inb[conn_io->inb_start - conn_io->inb_inuse], + sizeof (char) * conn_io->inb_inuse); + conn_io->inb_start = conn_io->inb_inuse; } return; } -static int poll_handler_libais_accept ( +static int poll_handler_accept ( poll_handle handle, int fd, int revent, @@ -944,7 +1126,7 @@ log_printf (LOG_LEVEL_DEBUG, "connection received from libais client %d.\n", new_fd); - res = conn_info_create (new_fd); + res = conn_io_create (new_fd); if (res != 0) { close (new_fd); } @@ -994,8 +1176,6 @@ struct sockaddr_un un_addr; int res; - log_init ("IPC"); - ipc_serialize_lock_fn = serialize_lock_fn; ipc_serialize_unlock_fn = serialize_unlock_fn; @@ -1009,7 +1189,7 @@ openais_exit_error (AIS_DONE_LIBAIS_SOCKET); }; - totemip_nosigpipe(libais_server_fd); + totemip_nosigpipe (libais_server_fd); res = fcntl (libais_server_fd, F_SETFL, O_NONBLOCK); if (res == -1) { log_printf (LOG_LEVEL_ERROR, "Could not set non-blocking operation on server socket: %s\n", strerror (errno)); @@ -1041,7 +1221,7 @@ * Setup libais connection dispatch routine */ poll_dispatch_add (aisexec_poll_handle, libais_server_fd, - POLLIN, 0, poll_handler_libais_accept); + POLLIN, 0, poll_handler_accept); g_gid_valid = gid_valid; @@ -1063,33 +1243,14 @@ { struct conn_info *conn_info = (struct conn_info *)conn; - if (conn != NULL) { - return ((void *)conn_info->private_data); - } else { - return NULL; - } + return (conn_info->private_data); } -/* - * Get the conn info partner connection - */ -void *openais_conn_partner_get (void *conn) -{ - struct conn_info *conn_info = (struct conn_info *)conn; - - if (conn != NULL) { - return ((void *)conn_info->conn_info_partner); - } else { - return NULL; - } -} - -int openais_conn_send_response ( - void *conn, +static int conn_io_send ( + struct conn_io *conn_io, void *msg, int mlen) { - struct queue *outq; char *cmsg; int res = 0; int queue_empty; @@ -1098,47 +1259,47 @@ struct msghdr msg_send; struct iovec iov_send; char *msg_addr; - struct conn_info *conn_info = (struct conn_info *)conn; - if (conn_info == NULL) { - return -1; + if (conn_io == NULL) { + assert (0); } - if (!libais_connection_active (conn_info)) { - return (-1); - } +// ipc_flow_control (conn_info); - ipc_flow_control (conn_info); - - outq = &conn_info->outq; - msg_send.msg_iov = &iov_send; msg_send.msg_name = 0; msg_send.msg_namelen = 0; msg_send.msg_iovlen = 1; +#ifndef OPENAIS_SOLARIS msg_send.msg_control = 0; msg_send.msg_controllen = 0; msg_send.msg_flags = 0; +#else + msg_send.msg_accrights = 0; + msg_send.msg_accrightslen = 0; +#endif - if (queue_is_full (outq)) { + pthread_mutex_lock (&conn_io->mutex); + if (queue_is_full (&conn_io->outq)) { /* * Start a disconnect if we have not already started one * and report that the outgoing queue is full */ log_printf (LOG_LEVEL_ERROR, "Library queue is full, disconnecting library connection.\n"); - libais_disconnect_request (conn_info); + disconnect_request (conn_io->conn_info); + pthread_mutex_unlock (&conn_io->mutex); return (-1); } - while (!queue_is_empty (outq)) { - queue_item = queue_item_get (outq); + while (!queue_is_empty (&conn_io->outq)) { + queue_item = queue_item_get (&conn_io->outq); msg_addr = (char *)queue_item->msg; - msg_addr = &msg_addr[conn_info->byte_start]; + msg_addr = &msg_addr[conn_io->byte_start]; iov_send.iov_base = msg_addr; - iov_send.iov_len = queue_item->mlen - conn_info->byte_start; + iov_send.iov_len = queue_item->mlen - conn_io->byte_start; retry_sendmsg: - res = sendmsg (conn_info->fd, &msg_send, MSG_NOSIGNAL); + res = sendmsg (conn_io->fd, &msg_send, MSG_NOSIGNAL); if (res == -1 && errno == EINTR) { goto retry_sendmsg; } @@ -1146,29 +1307,30 @@ break; /* outgoing kernel queue full */ } if (res == -1 && errno == EPIPE) { - libais_disconnect_request (conn_info); + disconnect_request (conn_io->conn_info); + pthread_mutex_unlock (&conn_io->mutex); return (0); } if (res == -1) { - assert (0); +// assert (0); break; /* some other error, stop trying to send message */ } - if (res + conn_info->byte_start != queue_item->mlen) { - conn_info->byte_start += res; + if (res + conn_io->byte_start != queue_item->mlen) { + conn_io->byte_start += res; break; } /* * Message sent, try sending another message */ - queue_item_remove (outq); - conn_info->byte_start = 0; + queue_item_remove (&conn_io->outq); + conn_io->byte_start = 0; free (queue_item->msg); } /* while queue not empty */ res = -1; - queue_empty = queue_is_empty (outq); + queue_empty = queue_is_empty (&conn_io->outq); /* * Send request message */ @@ -1177,21 +1339,21 @@ iov_send.iov_base = msg; iov_send.iov_len = mlen; retry_sendmsg_two: - res = sendmsg (conn_info->fd, &msg_send, MSG_NOSIGNAL); + res = sendmsg (conn_io->fd, &msg_send, MSG_NOSIGNAL); if (res == -1 && errno == EINTR) { goto retry_sendmsg_two; } if (res == -1 && errno == EAGAIN) { - conn_info->byte_start = 0; - conn_info->events = POLLIN|POLLNVAL; + conn_io->byte_start = 0; + conn_io->events = POLLIN|POLLNVAL; } if (res != -1) { if (res != mlen) { - conn_info->byte_start += res; + conn_io->byte_start += res; res = -1; } else { - conn_info->byte_start = 0; - conn_info->events = POLLIN|POLLNVAL; + conn_io->byte_start = 0; + conn_io->events = POLLIN|POLLNVAL; } } } @@ -1203,21 +1365,23 @@ cmsg = malloc (mlen); if (cmsg == 0) { log_printf (LOG_LEVEL_ERROR, "Library queue couldn't allocate a message, disconnecting library connection.\n"); - libais_disconnect_request (conn_info); + disconnect_request (conn_io->conn_info); + pthread_mutex_unlock (&conn_io->mutex); return (-1); } queue_item_out.msg = cmsg; queue_item_out.mlen = mlen; memcpy (cmsg, msg, mlen); - queue_item_add (outq, &queue_item_out); + queue_item_add (&conn_io->outq, &queue_item_out); /* * Send a pthread_kill to interrupt the poll syscall * and start a new poll operation in the thread */ - conn_info->events = POLLIN|POLLOUT|POLLNVAL; - pthread_kill (conn_info->thread, SIGUSR1); + conn_io->events = POLLIN|POLLOUT|POLLNVAL; + pthread_kill (conn_io->thread, SIGUSR1); } + pthread_mutex_unlock (&conn_io->mutex); return (0); } @@ -1238,7 +1402,6 @@ id_len, flow_control_state_set_fn, context); - conn_info->conn_info_partner->flow_control_handle = conn_info->flow_control_handle; } void openais_ipc_flow_control_destroy ( @@ -1279,3 +1442,18 @@ pthread_mutex_unlock (&conn_info->flow_control_mutex); } + + +int openais_response_send (void *conn, void *msg, int mlen) +{ + struct conn_info *conn_info = (struct conn_info *)conn; + + return (conn_io_send (conn_info->conn_io_response, msg, mlen)); +} + +int openais_dispatch_send (void *conn, void *msg, int mlen) +{ + struct conn_info *conn_info = (struct conn_info *)conn; + + return (conn_io_send (conn_info->conn_io_dispatch, msg, mlen)); +} Index: exec/ipc.h =================================================================== --- exec/ipc.h (revision 1498) +++ exec/ipc.h (revision 1499) @@ -42,12 +42,16 @@ extern int message_source_is_local (mar_message_source_t *source); -extern void *openais_conn_partner_get (void *conn); - extern void *openais_conn_private_data_get (void *conn); -extern int openais_conn_send_response (void *conn, void *msg, int mlen); +extern int openais_response_send (void *conn, void *msg, int mlen); +extern int openais_dispatch_send (void *conn, void *msg, int mlen); + +extern void openais_conn_info_refcnt_dec (void *conn); + +extern void openais_conn_info_refcnt_inc (void *conn); + extern void openais_ipc_init ( void (*serialize_lock_fn) (void), void (*serialize_unlock_fn) (void), Index: exec/amfcomp.c =================================================================== --- exec/amfcomp.c (revision 1498) +++ exec/amfcomp.c (revision 1499) @@ -547,8 +547,8 @@ AMF_RESPONSE_COMPONENTTERMINATECALLBACK, component_terminate_callback_data); - openais_conn_send_response ( - openais_conn_partner_get (comp->conn), + openais_dispatch_send ( + comp->conn, &res_lib, sizeof (struct res_lib_amf_componentterminatecallback)); @@ -817,8 +817,8 @@ res_lib_amf_csiremovecallback.csiFlags = 0; - openais_conn_send_response ( - openais_conn_partner_get (comp->conn), + openais_dispatch_send ( + comp->conn, &res_lib_amf_csiremovecallback, sizeof (struct res_lib_amf_csiremovecallback)); } @@ -1011,8 +1011,8 @@ TRACE8 ("sending healthcheck request to component %s", res_lib.compName.value); - openais_conn_send_response ( - openais_conn_partner_get (healthcheck->comp->conn), + openais_dispatch_send ( + healthcheck->comp->conn, &res_lib, sizeof (struct res_lib_amf_healthcheckcallback)); } @@ -1117,8 +1117,7 @@ res_lib->invocation = invocation_create (AMF_RESPONSE_CSISETCALLBACK, csi_assignment); - openais_conn_send_response ( - openais_conn_partner_get (comp->conn), res_lib, res_lib->header.size); + openais_dispatch_send (comp->conn, res_lib, res_lib->header.size); free(p); } @@ -1154,7 +1153,7 @@ res_lib.header.size = sizeof (struct res_lib_amf_componenterrorreport); res_lib.header.id = MESSAGE_RES_AMF_COMPONENTERRORREPORT; res_lib.header.error = SA_AIS_OK; - openais_conn_send_response (comp->conn, &res_lib, sizeof (res_lib)); + openais_dispatch_send (comp->conn, &res_lib, sizeof (res_lib)); } /* report to SU and let it handle the problem */ Index: exec/evt.c =================================================================== --- exec/evt.c (revision 1498) +++ exec/evt.c (revision 1499) @@ -1853,8 +1853,10 @@ res.evd_head.size = sizeof(res); res.evd_head.id = MESSAGE_RES_EVT_AVAILABLE; res.evd_head.error = SA_AIS_OK; - openais_conn_send_response(openais_conn_partner_get(conn), - &res, sizeof(res)); + openais_dispatch_send ( + conn, + &res, + sizeof(res)); } } @@ -2265,7 +2267,7 @@ res.ico_head.size = sizeof(res); res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL; res.ico_head.error = error; - openais_conn_send_response(conn, &res, sizeof(res)); + openais_response_send(conn, &res, sizeof(res)); } /* @@ -2322,7 +2324,7 @@ res.ico_head.size = sizeof(res); res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL; res.ico_head.error = error; - openais_conn_send_response(conn, &res, sizeof(res)); + openais_dispatch_send(conn, &res, sizeof(res)); } @@ -2415,7 +2417,7 @@ res.icc_head.size = sizeof(res); res.icc_head.id = MESSAGE_RES_EVT_CLOSE_CHANNEL; res.icc_head.error = ((ret == 0) ? SA_AIS_OK : SA_AIS_ERR_BAD_HANDLE); - openais_conn_send_response(conn, &res, sizeof(res)); + openais_response_send(conn, &res, sizeof(res)); } /* @@ -2487,7 +2489,7 @@ res.iuc_head.size = sizeof(res); res.iuc_head.id = MESSAGE_RES_EVT_UNLINK_CHANNEL; res.iuc_head.error = error; - openais_conn_send_response(conn, &res, sizeof(res)); + openais_response_send(conn, &res, sizeof(res)); } /* @@ -2591,7 +2593,7 @@ res.ics_head.size = sizeof(res); res.ics_head.id = MESSAGE_RES_EVT_SUBSCRIBE; res.ics_head.error = error; - openais_conn_send_response(conn, &res, sizeof(res)); + openais_response_send(conn, &res, sizeof(res)); /* * See if an existing event with a retention time @@ -2624,7 +2626,7 @@ res.ics_head.size = sizeof(res); res.ics_head.id = MESSAGE_RES_EVT_SUBSCRIBE; res.ics_head.error = error; - openais_conn_send_response(conn, &res, sizeof(res)); + openais_response_send(conn, &res, sizeof(res)); } /* @@ -2691,7 +2693,7 @@ res.icu_head.size = sizeof(res); res.icu_head.id = MESSAGE_RES_EVT_UNSUBSCRIBE; res.icu_head.error = error; - openais_conn_send_response(conn, &res, sizeof(res)); + openais_response_send(conn, &res, sizeof(res)); } /* @@ -2763,7 +2765,7 @@ res.iep_head.id = MESSAGE_RES_EVT_PUBLISH; res.iep_head.error = error; res.iep_event_id = event_id; - openais_conn_send_response(conn, &res, sizeof(res)); + openais_response_send(conn, &res, sizeof(res)); } /* @@ -2827,7 +2829,7 @@ res.iec_head.size = sizeof(res); res.iec_head.id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME; res.iec_head.error = error; - openais_conn_send_response(conn, &res, sizeof(res)); + openais_response_send(conn, &res, sizeof(res)); } @@ -2866,7 +2868,7 @@ edp->ed_event.led_head.id = MESSAGE_RES_EVT_EVENT_DATA; edp->ed_event.led_head.error = SA_AIS_OK; free(cel); - openais_conn_send_response(conn, &edp->ed_event, + openais_response_send(conn, &edp->ed_event, edp->ed_event.led_head.size); free_event_data(edp); goto data_get_done; @@ -2876,7 +2878,7 @@ res.led_head.size = sizeof(res.led_head); res.led_head.id = MESSAGE_RES_EVT_EVENT_DATA; res.led_head.error = SA_AIS_ERR_NOT_EXIST; - openais_conn_send_response(conn, &res, res.led_head.size); + openais_response_send(conn, &res, res.led_head.size); /* * See if there are any events that the app doesn't know about @@ -3038,7 +3040,7 @@ struct unlink_chan_pending *ucp; struct retention_time_clear_pending *rtc; struct libevt_pd *esip = - openais_conn_private_data_get(openais_conn_partner_get(conn)); + openais_conn_private_data_get(conn); log_printf(LOG_LEVEL_DEBUG, "saEvtFinalize (Event exit request)\n"); log_printf(LOG_LEVEL_DEBUG, "saEvtFinalize %d evts on list\n", @@ -3431,7 +3433,7 @@ res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL; res.ico_head.error = SA_AIS_ERR_TIMEOUT; ocp->ocp_invocation = OPEN_TIMED_OUT; - openais_conn_send_response(ocp->ocp_conn, &res, sizeof(res)); + openais_response_send(ocp->ocp_conn, &res, sizeof(res)); } /* @@ -3521,15 +3523,14 @@ resa.ica_channel_handle = handle; resa.ica_c_handle = ocp->ocp_c_handle; resa.ica_invocation = ocp->ocp_invocation; - openais_conn_send_response(openais_conn_partner_get(ocp->ocp_conn), - &resa, sizeof(resa)); + openais_dispatch_send(ocp->ocp_conn, &resa, sizeof(resa)); } else { struct res_evt_channel_open res; res.ico_head.size = sizeof(res); res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL; res.ico_head.error = (ret == 0 ? SA_AIS_OK : SA_AIS_ERR_BAD_HANDLE); res.ico_channel_handle = handle; - openais_conn_send_response(ocp->ocp_conn, &res, sizeof(res)); + openais_response_send(ocp->ocp_conn, &res, sizeof(res)); } if (timer_del_status == 0) { @@ -3554,7 +3555,7 @@ res.iuc_head.size = sizeof(res); res.iuc_head.id = MESSAGE_RES_EVT_UNLINK_CHANNEL; res.iuc_head.error = SA_AIS_OK; - openais_conn_send_response(ucp->ucp_conn, &res, sizeof(res)); + openais_response_send(ucp->ucp_conn, &res, sizeof(res)); free(ucp); } @@ -3574,7 +3575,7 @@ res.iec_head.size = sizeof(res); res.iec_head.id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME; res.iec_head.error = ret; - openais_conn_send_response(rtc->rtc_conn, &res, sizeof(res)); + openais_response_send(rtc->rtc_conn, &res, sizeof(res)); list_del(&rtc->rtc_entry); free(rtc); Index: exec/ckpt.c =================================================================== --- exec/ckpt.c (revision 1498) +++ exec/ckpt.c (revision 1499) @@ -1401,12 +1401,12 @@ res_lib_ckpt_checkpointopenasync.ckpt_id = checkpoint->ckpt_id; } - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_checkpointopen->source.conn, &res_lib_ckpt_checkpointopenasync, sizeof (struct res_lib_ckpt_checkpointopenasync)); - openais_conn_send_response ( - openais_conn_partner_get (req_exec_ckpt_checkpointopen->source.conn), + openais_dispatch_send ( + req_exec_ckpt_checkpointopen->source.conn, &res_lib_ckpt_checkpointopenasync, sizeof (struct res_lib_ckpt_checkpointopenasync)); } else { @@ -1420,7 +1420,7 @@ } res_lib_ckpt_checkpointopen.header.error = error; - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_checkpointopen->source.conn, &res_lib_ckpt_checkpointopen, sizeof (struct res_lib_ckpt_checkpointopen)); @@ -1593,8 +1593,10 @@ res_lib_ckpt_checkpointclose.header.size = sizeof (struct res_lib_ckpt_checkpointclose); res_lib_ckpt_checkpointclose.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTCLOSE; res_lib_ckpt_checkpointclose.header.error = error; - openais_conn_send_response (req_exec_ckpt_checkpointclose->source.conn, - &res_lib_ckpt_checkpointclose, sizeof (struct res_lib_ckpt_checkpointclose)); + openais_response_send ( + req_exec_ckpt_checkpointclose->source.conn, + &res_lib_ckpt_checkpointclose, + sizeof (struct res_lib_ckpt_checkpointclose)); } /* @@ -1646,7 +1648,7 @@ res_lib_ckpt_checkpointunlink.header.size = sizeof (struct res_lib_ckpt_checkpointunlink); res_lib_ckpt_checkpointunlink.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTUNLINK; res_lib_ckpt_checkpointunlink.header.error = error; - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_checkpointunlink->source.conn, &res_lib_ckpt_checkpointunlink, sizeof (struct res_lib_ckpt_checkpointunlink)); @@ -1694,7 +1696,7 @@ res_lib_ckpt_checkpointretentiondurationset.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET; res_lib_ckpt_checkpointretentiondurationset.header.error = error; - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_checkpointretentiondurationset->source.conn, &res_lib_ckpt_checkpointretentiondurationset, sizeof (struct res_lib_ckpt_checkpointretentiondurationset)); @@ -1900,7 +1902,8 @@ res_lib_ckpt_sectioncreate.header.id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE; res_lib_ckpt_sectioncreate.header.error = error; - openais_conn_send_response (req_exec_ckpt_sectioncreate->source.conn, + openais_response_send ( + req_exec_ckpt_sectioncreate->source.conn, &res_lib_ckpt_sectioncreate, sizeof (struct res_lib_ckpt_sectioncreate)); } @@ -1965,7 +1968,7 @@ res_lib_ckpt_sectiondelete.header.id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE; res_lib_ckpt_sectiondelete.header.error = error; - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_sectiondelete->source.conn, &res_lib_ckpt_sectiondelete, sizeof (struct res_lib_ckpt_sectiondelete)); @@ -2058,7 +2061,7 @@ MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET; res_lib_ckpt_sectionexpirationtimeset.header.error = error; - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_sectionexpirationtimeset->source.conn, &res_lib_ckpt_sectionexpirationtimeset, sizeof (struct res_lib_ckpt_sectionexpirationtimeset)); @@ -2168,7 +2171,7 @@ MESSAGE_RES_CKPT_CHECKPOINT_SECTIONWRITE; res_lib_ckpt_sectionwrite.header.error = error; - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_sectionwrite->source.conn, &res_lib_ckpt_sectionwrite, sizeof (struct res_lib_ckpt_sectionwrite)); @@ -2265,7 +2268,7 @@ MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE; res_lib_ckpt_sectionoverwrite.header.error = error; - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_sectionoverwrite->source.conn, &res_lib_ckpt_sectionoverwrite, sizeof (struct res_lib_ckpt_sectionoverwrite)); @@ -2358,7 +2361,7 @@ res_lib_ckpt_sectionread.data_read = section_size; } - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_sectionread->source.conn, &res_lib_ckpt_sectionread, sizeof (struct res_lib_ckpt_sectionread)); @@ -2369,7 +2372,7 @@ if (error == SA_AIS_OK) { char *sd; sd = (char *)checkpoint_section->section_data; - openais_conn_send_response ( + openais_response_send ( req_exec_ckpt_sectionread->source.conn, &sd[req_exec_ckpt_sectionread->data_offset], section_size); @@ -2578,7 +2581,7 @@ res_lib_ckpt_activereplicaset.header.id = MESSAGE_RES_CKPT_ACTIVEREPLICASET; res_lib_ckpt_activereplicaset.header.error = error; - openais_conn_send_response ( + openais_response_send ( conn, &res_lib_ckpt_activereplicaset, sizeof (struct res_lib_ckpt_activereplicaset)); @@ -2641,7 +2644,7 @@ res_lib_ckpt_checkpointstatusget.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET; res_lib_ckpt_checkpointstatusget.header.error = SA_AIS_ERR_NOT_EXIST; } - openais_conn_send_response ( + openais_response_send ( conn, &res_lib_ckpt_checkpointstatusget, sizeof (struct res_lib_ckpt_checkpointstatusget)); @@ -2967,7 +2970,7 @@ res_lib_ckpt_checkpointsynchronize.header.size = sizeof (struct res_lib_ckpt_checkpointsynchronize); res_lib_ckpt_checkpointsynchronize.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE; - openais_conn_send_response ( + openais_response_send ( conn, &res_lib_ckpt_checkpointsynchronize, sizeof (struct res_lib_ckpt_checkpointsynchronize)); @@ -2998,13 +3001,13 @@ res_lib_ckpt_checkpointsynchronizeasync.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC; res_lib_ckpt_checkpointsynchronizeasync.invocation = req_lib_ckpt_checkpointsynchronizeasync->invocation; - openais_conn_send_response ( + openais_response_send ( conn, &res_lib_ckpt_checkpointsynchronizeasync, sizeof (struct res_lib_ckpt_checkpointsynchronizeasync)); - openais_conn_send_response ( - openais_conn_partner_get (conn), + openais_dispatch_send ( + conn, &res_lib_ckpt_checkpointsynchronizeasync, sizeof (struct res_lib_ckpt_checkpointsynchronizeasync)); } @@ -3133,7 +3136,7 @@ res_lib_ckpt_sectioniterationinitialize.max_section_id_size = checkpoint->checkpoint_creation_attributes.max_section_id_size; - openais_conn_send_response ( + openais_response_send ( conn, &res_lib_ckpt_sectioniterationinitialize, sizeof (struct res_lib_ckpt_sectioniterationinitialize)); @@ -3174,7 +3177,7 @@ res_lib_ckpt_sectioniterationfinalize.header.id = MESSAGE_RES_CKPT_SECTIONITERATIONFINALIZE; res_lib_ckpt_sectioniterationfinalize.header.error = error; - openais_conn_send_response ( + openais_response_send ( conn, &res_lib_ckpt_sectioniterationfinalize, sizeof (struct res_lib_ckpt_sectioniterationfinalize)); @@ -3262,13 +3265,13 @@ res_lib_ckpt_sectioniterationnext.header.id = MESSAGE_RES_CKPT_SECTIONITERATIONNEXT; res_lib_ckpt_sectioniterationnext.header.error = error; - openais_conn_send_response ( + openais_response_send ( conn, &res_lib_ckpt_sectioniterationnext, sizeof (struct res_lib_ckpt_sectioniterationnext)); if (error == SA_AIS_OK) { - openais_conn_send_response ( + openais_response_send ( conn, checkpoint_section->section_descriptor.section_id.id, checkpoint_section->section_descriptor.section_id.id_len); Index: exec/amf.c =================================================================== --- exec/amf.c (revision 1498) +++ exec/amf.c (revision 1499) @@ -480,7 +480,7 @@ res_lib.header.id = MESSAGE_RES_AMF_COMPONENTREGISTER; res_lib.header.size = sizeof (struct res_lib_amf_componentregister); res_lib.header.error = error; - openais_conn_send_response ( + openais_response_send ( comp->conn, &res_lib, sizeof (struct res_lib_amf_componentregister)); } } @@ -544,7 +544,7 @@ res_lib.header.id = MESSAGE_RES_AMF_RESPONSE; res_lib.header.size = sizeof (struct res_lib_amf_response); res_lib.header.error = retval; - openais_conn_send_response (comp->conn, &res_lib, sizeof (res_lib)); + openais_response_send (comp->conn, &res_lib, sizeof (res_lib)); } } @@ -583,7 +583,7 @@ res_lib.header.id = MESSAGE_RES_AMF_COMPONENTREGISTER; res_lib.header.size = sizeof (struct res_lib_amf_componentregister); res_lib.header.error = SA_AIS_ERR_INVALID_PARAM; - openais_conn_send_response ( + openais_response_send ( conn, &res_lib, sizeof (struct res_lib_amf_componentregister)); } } @@ -658,12 +658,13 @@ res_lib.header.id = MESSAGE_RES_AMF_HEALTHCHECKSTART; res_lib.header.size = sizeof (res_lib); res_lib.header.error = error; - openais_conn_send_response (conn, &res_lib, + openais_response_send (conn, &res_lib, sizeof (struct res_lib_amf_healthcheckstart)); } static void message_handler_req_lib_amf_healthcheckconfirm ( - void *conn, void *msg) + void *conn, + void *msg) { struct req_lib_amf_healthcheckconfirm *req_lib = msg; struct res_lib_amf_healthcheckconfirm res_lib; @@ -683,7 +684,7 @@ res_lib.header.id = MESSAGE_RES_AMF_HEALTHCHECKCONFIRM; res_lib.header.size = sizeof (res_lib); res_lib.header.error = error; - openais_conn_send_response (conn, &res_lib, sizeof (res_lib)); + openais_response_send (conn, &res_lib, sizeof (res_lib)); } static void message_handler_req_lib_amf_healthcheckstop ( @@ -706,7 +707,7 @@ res_lib.header.id = MESSAGE_RES_AMF_HEALTHCHECKSTOP; res_lib.header.size = sizeof (res_lib); res_lib.header.error = error; - openais_conn_send_response (conn, &res_lib, sizeof (res_lib)); + openais_response_send (conn, &res_lib, sizeof (res_lib)); } static void message_handler_req_lib_amf_hastateget (void *conn, void *msg) @@ -732,7 +733,7 @@ res_lib.header.size = sizeof (struct res_lib_amf_hastateget); res_lib.header.error = error; - openais_conn_send_response (conn, &res_lib, + openais_response_send (conn, &res_lib, sizeof (struct res_lib_amf_hastateget)); } @@ -788,7 +789,7 @@ if (amfProtectionGroup) { res_lib_amf_protectiongrouptrack.header.error = SA_AIS_OK; } - openais_conn_send_response (conn, &res_lib_amf_protectiongrouptrack, + openais_response_send (conn, &res_lib_amf_protectiongrouptrack, sizeof (struct res_lib_amf_protectiongrouptrack)); if (amfProtectionGroup && @@ -848,7 +849,7 @@ if (track) { res_lib_amf_protectiongrouptrackstop.header.error = SA_AIS_OK; } - openais_conn_send_response (conn, &res_lib_amf_protectiongrouptrackstop, + openais_response_send (conn, &res_lib_amf_protectiongrouptrackstop, sizeof (struct res_lib_amf_protectiongrouptrackstop)); #endif @@ -896,7 +897,7 @@ res_lib.header.size = sizeof (struct res_lib_amf_componenterrorreport); res_lib.header.id = MESSAGE_RES_AMF_COMPONENTERRORREPORT; res_lib.header.error = SA_AIS_ERR_NOT_EXIST; - openais_conn_send_response (conn, &res_lib, + openais_response_send (conn, &res_lib, sizeof (struct res_lib_amf_componenterrorreport)); } } @@ -963,7 +964,7 @@ res_lib.header.id = MESSAGE_RES_AMF_RESPONSE; res_lib.header.size = sizeof (struct res_lib_amf_response); res_lib.header.error = retval; - openais_conn_send_response (conn, &res_lib, sizeof (res_lib)); + openais_response_send (conn, &res_lib, sizeof (res_lib)); } } Index: exec/lck.c =================================================================== --- exec/lck.c (revision 1498) +++ exec/lck.c (revision 1499) @@ -719,12 +719,12 @@ &req_exec_lck_resourceopen->source, sizeof (mar_message_source_t)); - openais_conn_send_response ( + openais_response_send ( req_exec_lck_resourceopen->source.conn, &res_lib_lck_resourceopenasync, sizeof (struct res_lib_lck_resourceopenasync)); - openais_conn_send_response ( - openais_conn_partner_get (req_exec_lck_resourceopen->source.conn), + openais_dispatch_send ( + req_exec_lck_resourceopen->source.conn, &res_lib_lck_resourceopenasync, sizeof (struct res_lib_lck_resourceopenasync)); } else { @@ -738,7 +738,7 @@ &req_exec_lck_resourceopen->source, sizeof (mar_message_source_t)); - openais_conn_send_response (req_exec_lck_resourceopen->source.conn, + openais_response_send (req_exec_lck_resourceopen->source.conn, &res_lib_lck_resourceopen, sizeof (struct res_lib_lck_resourceopen)); } @@ -774,7 +774,7 @@ res_lib_lck_resourceclose.header.size = sizeof (struct res_lib_lck_resourceclose); res_lib_lck_resourceclose.header.id = MESSAGE_RES_LCK_RESOURCECLOSE; res_lib_lck_resourceclose.header.error = error; - openais_conn_send_response ( + openais_response_send ( req_exec_lck_resourceclose->source.conn, &res_lib_lck_resourceclose, sizeof (struct res_lib_lck_resourceclose)); } @@ -801,8 +801,8 @@ res_lib_lck_lockwaitercallback.mode_held = SA_LCK_PR_LOCK_MODE; } - openais_conn_send_response ( - openais_conn_partner_get (resource_lock->callback_source.conn), + openais_dispatch_send ( + resource_lock->callback_source.conn, &res_lib_lck_lockwaitercallback, sizeof (struct res_lib_lck_lockwaitercallback)); } @@ -837,8 +837,8 @@ res_lib_lck_resourcelockasync.lockStatus = resource_lock->lock_status; res_lib_lck_resourcelockasync.invocation = resource_lock->invocation; res_lib_lck_resourcelockasync.lockId = resource_lock->lock_id; - openais_conn_send_response ( - openais_conn_partner_get (source->conn), + openais_dispatch_send ( + source->conn, &res_lib_lck_resourcelockasync, sizeof (struct res_lib_lck_resourcelockasync)); } @@ -861,7 +861,7 @@ res_lib_lck_resourcelock.header.error = error; res_lib_lck_resourcelock.resource_lock = (void *)resource_lock; res_lib_lck_resourcelock.lockStatus = resource_lock->lock_status; - openais_conn_send_response (source->conn, + openais_response_send (source->conn, &res_lib_lck_resourcelock, sizeof (struct res_lib_lck_resourcelock)); } @@ -1133,14 +1133,11 @@ * Deliver async response to library */ req_exec_lck_resourcelock->source.conn = - openais_conn_partner_get (req_exec_lck_resourcelock->source.conn); + req_exec_lck_resourcelock->source.conn; resource_lock_async_deliver ( &req_exec_lck_resourcelock->source, resource_lock, SA_AIS_OK); -// TODO why is this twice ? - req_exec_lck_resourcelock->source.conn = - openais_conn_partner_get (req_exec_lck_resourcelock->source.conn); } error_exit: @@ -1184,11 +1181,11 @@ res_lib_lck_resourceunlockasync.invocation = req_exec_lck_resourceunlock->invocation; - openais_conn_send_response ( - openais_conn_partner_get(req_exec_lck_resourceunlock->source.conn), + openais_dispatch_send ( + req_exec_lck_resourceunlock->source.conn, &res_lib_lck_resourceunlockasync, sizeof (struct res_lib_lck_resourceunlockasync)); - openais_conn_send_response ( + openais_response_send ( resource_lock->callback_source.conn, &res_lib_lck_resourceunlockasync, sizeof (struct res_lib_lck_resourceunlockasync)); @@ -1196,8 +1193,10 @@ res_lib_lck_resourceunlock.header.size = sizeof (struct res_lib_lck_resourceunlock); res_lib_lck_resourceunlock.header.id = MESSAGE_RES_LCK_RESOURCEUNLOCK; res_lib_lck_resourceunlock.header.error = error; - openais_conn_send_response (req_exec_lck_resourceunlock->source.conn, - &res_lib_lck_resourceunlock, sizeof (struct res_lib_lck_resourceunlock)); + openais_response_send ( + req_exec_lck_resourceunlock->source.conn, + &res_lib_lck_resourceunlock, + sizeof (struct res_lib_lck_resourceunlock)); } } } @@ -1253,8 +1252,10 @@ res_lib_lck_lockpurge.header.size = sizeof (struct res_lib_lck_lockpurge); res_lib_lck_lockpurge.header.id = MESSAGE_RES_LCK_LOCKPURGE; res_lib_lck_lockpurge.header.error = error; - openais_conn_send_response (req_exec_lck_lockpurge->source.conn, - &res_lib_lck_lockpurge, sizeof (struct res_lib_lck_lockpurge)); + openais_response_send ( + req_exec_lck_lockpurge->source.conn, + &res_lib_lck_lockpurge, + sizeof (struct res_lib_lck_lockpurge)); } } @@ -1366,7 +1367,8 @@ res_lib_lck_resourceclose.header.id = MESSAGE_RES_LCK_RESOURCECLOSE; res_lib_lck_resourceclose.header.error = SA_AIS_ERR_NOT_EXIST; - openais_conn_send_response (conn, + openais_response_send ( + conn, &res_lib_lck_resourceclose, sizeof (struct res_lib_lck_resourceclose)); } Index: exec/msg.c =================================================================== --- exec/msg.c (revision 1498) +++ exec/msg.c (revision 1499) @@ -807,12 +807,12 @@ &req_exec_msg_queueopen->source, sizeof (mar_message_source_t)); - openais_conn_send_response ( + openais_response_send ( req_exec_msg_queueopen->source.conn, &res_lib_msg_queueopenasync, sizeof (struct res_lib_msg_queueopenasync)); - openais_conn_send_response ( - openais_conn_partner_get (req_exec_msg_queueopen->source.conn), + openais_dispatch_send ( + req_exec_msg_queueopen->source.conn, &res_lib_msg_queueopenasync, sizeof (struct res_lib_msg_queueopenasync)); } else { @@ -826,7 +826,7 @@ &req_exec_msg_queueopen->source, sizeof (mar_message_source_t)); - openais_conn_send_response ( + openais_dispatch_send ( req_exec_msg_queueopen->source.conn, &res_lib_msg_queueopen, sizeof (struct res_lib_msg_queueopen)); @@ -865,8 +865,10 @@ res_lib_msg_queueclose.header.size = sizeof (struct res_lib_msg_queueclose); res_lib_msg_queueclose.header.id = MESSAGE_RES_MSG_QUEUECLOSE; res_lib_msg_queueclose.header.error = error; - openais_conn_send_response (req_exec_msg_queueclose->source.conn, - &res_lib_msg_queueclose, sizeof (struct res_lib_msg_queueclose)); + openais_dispatch_send ( + req_exec_msg_queueclose->source.conn, + &res_lib_msg_queueclose, + sizeof (struct res_lib_msg_queueclose)); } } @@ -927,7 +929,7 @@ res_lib_msg_queuegroupcreate.header.id = MESSAGE_RES_MSG_QUEUEGROUPCREATE; res_lib_msg_queuegroupcreate.header.error = error; - openais_conn_send_response ( + openais_dispatch_send ( req_exec_msg_queuegroupcreate->source.conn, &res_lib_msg_queuegroupcreate, sizeof (struct res_lib_msg_queuegroupcreate)); @@ -974,7 +976,7 @@ res_lib_msg_queuegroupinsert.header.id = MESSAGE_RES_MSG_QUEUEGROUPCREATE; res_lib_msg_queuegroupinsert.header.error = error; - openais_conn_send_response ( + openais_dispatch_send ( req_exec_msg_queuegroupinsert->source.conn, &res_lib_msg_queuegroupinsert, sizeof (struct res_lib_msg_queuegroupinsert)); @@ -1018,7 +1020,7 @@ res_lib_msg_queuegroupremove.header.id = MESSAGE_RES_MSG_QUEUEGROUPCREATE; res_lib_msg_queuegroupremove.header.error = error; - openais_conn_send_response ( + openais_dispatch_send ( req_exec_msg_queuegroupremove->source.conn, &res_lib_msg_queuegroupremove, sizeof (struct res_lib_msg_queuegroupremove)); @@ -1049,7 +1051,7 @@ res_lib_msg_queuegroupdelete.header.id = MESSAGE_RES_MSG_QUEUEGROUPCREATE; res_lib_msg_queuegroupdelete.header.error = error; - openais_conn_send_response ( + openais_dispatch_send ( req_exec_msg_queuegroupdelete->source.conn, &res_lib_msg_queuegroupdelete, sizeof (struct res_lib_msg_queuegroupdelete)); Index: lib/cpg.c =================================================================== --- lib/cpg.c (revision 1498) +++ lib/cpg.c (revision 1499) @@ -107,8 +107,7 @@ goto error_destroy; } - error = saServiceConnect (&cpg_inst->dispatch_fd, - &cpg_inst->response_fd, + error = saServiceConnect (&cpg_inst->response_fd, &cpg_inst->dispatch_fd, CPG_SERVICE); if (error != SA_AIS_OK) { goto error_put_destroy; @@ -461,7 +460,7 @@ iov[0].iov_base = &req_lib_cpg_trackstart; iov[0].iov_len = sizeof (struct req_lib_cpg_trackstart); - error = saSendMsgReceiveReply (cpg_inst->dispatch_fd, iov, 1, + error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1, &res_lib_cpg_trackstart, sizeof (struct res_lib_cpg_trackstart)); if (error != SA_AIS_OK) { Index: README.devmap =================================================================== --- README.devmap (revision 1498) +++ README.devmap (revision 1499) @@ -1221,3 +1221,91 @@ for other people. Have fun! + +--- +IPC state machine + +lib_exit_fn may not use openais_response_send or openais_dispatch_send + +state INITIALIZING +------------------ +receive response end of a library request + create conn_io data structure + if the connection's UID/GID is invalid + conn_io_disconnect + send response to conn_io with conn_io's address + set conn_io refcnt to 1 + +receive dispatch end of a library connection with response end conn_io address + find matching connection + if the connection's UID/GID is invalid + conn_io_disconnect + send reponse to conn_io + create conn_info data structure + if conn_io response end is valid + store dispatch end of conn_io into conn_info data structure + store response conn_io into conn_info data structure + set conn_io refcnt to 1 + call lib_init_fn for service type + set conn_info refcnt to 1 + set state to ACTIVE + +event response disconnects and conn_info not bound + disconnect connection +event receive connects and response connection not found + disconnect dispatch +event no dispatch connection within 5 seconds + disconnect dispatch +event disconnect_request + set state to DISCONNECT_REQUESTED + decrement response conn_io refcnt by 1 + decrement dispatch conn_io refcnt by 1 + +state ACTIVE +------------ +do { + increment conn_io refcnt by 1 + poll + if invalid poll set state internal_disconnect_request + dispatch library handler functions + flush any output that can be flushed + decrement conn_io refcnt by 1 + if state set to DISCONNECT_REQUESTED + execute algorithm for disconnect requested state +} + +event internal_disconnect_request + set state to DISCONNECT_REQUESTED + decrement response conn_io refcnt by 1 + decrement dispatch conn_io refcnt by 1 +event openais_conn_refcnt_increment + increase conn_info reference count by 1 +event openais_conn_refcnt_decrement + decrease conn_info reference count by 1 + +state DISCONNECT_REQUESTED +-------------------------- +if this is the response conn_io data +do { + if response conn_io refcnt is 0 + destroy conn_io data structure + decrement conn_info reference count + exit thread + sleep configurable short duration +} + +if this is the dispatch conn_io data +do { + if dispatch conn_io refcnt is 0 + call lib_exit_fn + if successful + destroy conn_io data structure + decrement conn_info reference count + exit thread + sleep configurable short duration +} + +when conn_info reference count equals 0 + free conn_info data structure + +