57 #include <sys/types.h> 59 #include <sys/socket.h> 62 #include <sys/ioctl.h> 63 #include <sys/param.h> 64 #include <netinet/in.h> 65 #include <arpa/inet.h> 78 #include <qb/qbdefs.h> 79 #include <qb/qbutil.h> 80 #include <qb/qbloop.h> 86 #define LOGSYS_UTILS_ONLY 1 95 #define LOCALHOST_IP inet_addr("127.0.0.1") 96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 100 #define RETRANSMIT_ENTRIES_MAX 30 101 #define TOKEN_SIZE_MAX 64000 102 #define LEAVE_DUMMY_NODEID 0 114 #define SEQNO_START_MSG 0x0 115 #define SEQNO_START_TOKEN 0x0 137 #define ENDIAN_LOCAL 0xff22 374 struct sq regular_sort_queue;
376 struct sq recovery_sort_queue;
433 void (*totemsrp_log_printf) (
436 const char *
function,
439 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
449 void (*totemsrp_deliver_fn) (
452 unsigned int msg_len,
453 int endian_conversion_required);
455 void (*totemsrp_confchg_fn) (
457 const unsigned int *member_list,
size_t member_list_entries,
458 const unsigned int *left_list,
size_t left_list_entries,
459 const unsigned int *joined_list,
size_t joined_list_entries,
462 void (*totemsrp_service_ready_fn) (void);
464 void (*totemsrp_waiting_trans_ack_cb_fn) (
465 int waiting_trans_ack);
467 void (*memb_ring_id_create_or_load) (
471 void (*memb_ring_id_store) (
472 const struct memb_ring_id *memb_ring_id,
525 char commit_token_storage[40000];
530 int (*handler_functions[6]) (
534 int endian_conversion_needed);
579 static int message_handler_orf_token (
583 int endian_conversion_needed);
585 static int message_handler_mcast (
589 int endian_conversion_needed);
591 static int message_handler_memb_merge_detect (
595 int endian_conversion_needed);
597 static int message_handler_memb_join (
601 int endian_conversion_needed);
603 static int message_handler_memb_commit_token (
607 int endian_conversion_needed);
609 static int message_handler_token_hold_cancel (
613 int endian_conversion_needed);
617 static unsigned int main_msgs_missing (
void);
619 static void main_token_seqid_get (
622 unsigned int *token_is);
624 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src);
626 static void srp_addr_to_nodeid (
627 unsigned int *nodeid_out,
629 unsigned int entries);
631 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
637 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
639 int fcc_mcasts_allowed);
640 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
644 static void target_set_completed (
void *context);
646 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
651 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
652 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out);
653 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
654 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
655 static void memb_merge_detect_endian_convert (
658 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in);
659 static void timer_function_orf_token_timeout (
void *data);
660 static void timer_function_pause_timeout (
void *data);
661 static void timer_function_heartbeat_timeout (
void *data);
662 static void timer_function_token_retransmit_timeout (
void *data);
663 static void timer_function_token_hold_retransmit_timeout (
void *data);
664 static void timer_function_merge_detect_timeout (
void *data);
666 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
672 unsigned int msg_len);
677 unsigned int iface_no);
682 message_handler_orf_token,
683 message_handler_mcast,
684 message_handler_memb_merge_detect,
685 message_handler_memb_join,
686 message_handler_memb_commit_token,
687 message_handler_token_hold_cancel
691 #define log_printf(level, format, args...) \ 693 instance->totemsrp_log_printf ( \ 694 level, instance->totemsrp_subsys_id, \ 695 __FUNCTION__, __FILE__, __LINE__, \ 698 #define LOGSYS_PERROR(err_num, level, fmt, args...) \ 700 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ 701 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ 702 instance->totemsrp_log_printf ( \ 703 level, instance->totemsrp_subsys_id, \ 704 __FUNCTION__, __FILE__, __LINE__, \ 705 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ 711 return gather_state_from_desc[gsfrom];
751 static void main_token_seqid_get (
754 unsigned int *token_is)
766 static unsigned int main_msgs_missing (
void)
775 uint64_t timestamp_msec;
778 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
783 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
798 unsigned long long nano_secs = qb_util_nano_current_get ();
800 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
833 qb_loop_t *poll_handle,
841 unsigned int msg_len,
842 int endian_conversion_required),
846 const unsigned int *member_list,
size_t member_list_entries,
847 const unsigned int *left_list,
size_t left_list_entries,
848 const unsigned int *joined_list,
size_t joined_list_entries,
850 void (*waiting_trans_ack_cb_fn) (
856 if (instance == NULL) {
860 totemsrp_instance_initialize (instance);
898 "Token Timeout (%d ms) retransmit timeout (%d ms)",
901 "token hold (%d ms) retransmits before loss (%d retrans)",
904 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
911 "downcheck (%d ms) fail to recv const (%d msgs)",
917 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
921 "missed count const (%d messages)",
925 "send threads (%d threads)", totem_config->
threads);
927 "RRP token expired timeout (%d ms)",
930 "RRP token problem counter (%d ms)",
933 "RRP threshold (%d problem count)",
936 "RRP multicast threshold (%d problem count)",
939 "RRP automatic recovery check timeout (%d ms)",
966 timer_function_pause_timeout (instance);
970 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
981 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
985 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
987 "heartbeat timeout should be less than the token timeout. HeartBeat is Diabled !!");
1003 main_iface_change_fn,
1004 main_token_seqid_get,
1006 target_set_completed);
1023 token_event_stats_collector,
1029 token_event_stats_collector,
1031 *srp_context = instance;
1044 memb_leave_message_send (instance);
1064 unsigned int nodeid,
1066 unsigned int interfaces_size,
1068 unsigned int *iface_count)
1072 unsigned int found = 0;
1085 if (interfaces_size >= *iface_count) {
1105 if (interfaces_size >= *iface_count) {
1122 const char *cipher_type,
1123 const char *hash_type)
1172 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1177 for (i = 0; i < 1; i++) {
1186 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src)
1197 static void srp_addr_to_nodeid (
1198 unsigned int *nodeid_out,
1200 unsigned int entries)
1204 for (i = 0; i < entries; i++) {
1205 nodeid_out[i] = srp_addr_in[i].
addr[0].
nodeid;
1209 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in)
1223 static void memb_set_subtract (
1224 struct srp_addr *out_list,
int *out_list_entries,
1225 struct srp_addr *one_list,
int one_list_entries,
1226 struct srp_addr *two_list,
int two_list_entries)
1232 *out_list_entries = 0;
1234 for (i = 0; i < one_list_entries; i++) {
1235 for (j = 0; j < two_list_entries; j++) {
1236 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1242 srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1243 *out_list_entries = *out_list_entries + 1;
1252 static void memb_consensus_set (
1279 static int memb_consensus_isset (
1296 static int memb_consensus_agreed (
1299 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1300 int token_memb_entries = 0;
1304 memb_set_subtract (token_memb, &token_memb_entries,
1308 for (i = 0; i < token_memb_entries; i++) {
1309 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1324 assert (token_memb_entries >= 1);
1329 static void memb_consensus_notset (
1331 struct srp_addr *no_consensus_list,
1332 int *no_consensus_list_entries,
1334 int comparison_list_entries)
1338 *no_consensus_list_entries = 0;
1341 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1342 srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->
my_proc_list[i]);
1343 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1351 static int memb_set_equal (
1352 struct srp_addr *set1,
int set1_entries,
1353 struct srp_addr *set2,
int set2_entries)
1360 if (set1_entries != set2_entries) {
1363 for (i = 0; i < set2_entries; i++) {
1364 for (j = 0; j < set1_entries; j++) {
1365 if (srp_addr_equal (&set1[j], &set2[i])) {
1381 static int memb_set_subset (
1382 const struct srp_addr *subset,
int subset_entries,
1383 const struct srp_addr *fullset,
int fullset_entries)
1389 if (subset_entries > fullset_entries) {
1392 for (i = 0; i < subset_entries; i++) {
1393 for (j = 0; j < fullset_entries; j++) {
1394 if (srp_addr_equal (&subset[i], &fullset[j])) {
1408 static void memb_set_merge (
1409 const struct srp_addr *subset,
int subset_entries,
1410 struct srp_addr *fullset,
int *fullset_entries)
1416 for (i = 0; i < subset_entries; i++) {
1417 for (j = 0; j < *fullset_entries; j++) {
1418 if (srp_addr_equal (&fullset[j], &subset[i])) {
1424 srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1425 *fullset_entries = *fullset_entries + 1;
1432 static void memb_set_and_with_ring_id (
1448 for (i = 0; i < set2_entries; i++) {
1449 for (j = 0; j < set1_entries; j++) {
1450 if (srp_addr_equal (&set1[j], &set2[i])) {
1451 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1458 srp_addr_copy (&and[*and_entries], &set1[j]);
1459 *and_entries = *and_entries + 1;
1466 #ifdef CODE_COVERAGE 1467 static void memb_set_print (
1474 printf (
"List '%s' contains %d entries:\n",
string, list_entries);
1476 for (i = 0; i < list_entries; i++) {
1477 printf (
"Address %d with %d rings\n", i, list[i].
no_addrs);
1478 for (j = 0; j < list[i].
no_addrs; j++) {
1479 printf (
"\tiface %d %s\n", j,
totemip_print (&list[i].addr[j]));
1480 printf (
"\tfamily %d\n", list[i].addr[j].
family);
1485 static void my_leave_memb_clear(
1492 static unsigned int my_leave_memb_match(
1494 unsigned int nodeid)
1497 unsigned int ret = 0;
1508 static void my_leave_memb_set(
1510 unsigned int nodeid)
1527 "Cannot set LEAVE nodeid=%d", nodeid);
1534 assert (instance != NULL);
1538 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1540 assert (instance != NULL);
1552 timer_function_token_retransmit_timeout,
1564 timer_function_merge_detect_timeout,
1590 "Saving state aru %x high seq received %x",
1600 "Restoring instance->my_aru %x my high seq received %x",
1607 "Resetting old ring state");
1618 timer_function_pause_timeout,
1628 timer_function_orf_token_timeout,
1638 timer_function_heartbeat_timeout,
1651 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1656 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1662 timer_function_token_hold_retransmit_timeout,
1666 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1672 static void memb_state_consensus_timeout_expired (
1675 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1676 int no_consensus_list_entries;
1679 if (memb_consensus_agreed (instance)) {
1680 memb_consensus_reset (instance);
1682 memb_consensus_set (instance, &instance->
my_id);
1684 reset_token_timeout (instance);
1686 memb_consensus_notset (
1689 &no_consensus_list_entries,
1693 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1706 static void timer_function_pause_timeout (
void *data)
1711 reset_pause_timeout (instance);
1716 old_ring_state_restore (instance);
1721 static void timer_function_orf_token_timeout (
void *data)
1728 "The token was lost in the OPERATIONAL state.");
1730 "A processor failed, forming new configuration.");
1738 "The consensus timeout expired.");
1739 memb_state_consensus_timeout_expired (instance);
1746 "The token was lost in the COMMIT state.");
1753 "The token was lost in the RECOVERY state.");
1754 memb_recovery_state_token_loss (instance);
1760 static void timer_function_heartbeat_timeout (
void *data)
1764 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1765 timer_function_orf_token_timeout(data);
1768 static void memb_timer_function_state_gather (
void *data)
1779 memb_join_message_send (instance);
1790 memb_timer_function_state_gather,
1796 static void memb_timer_function_gather_consensus_timeout (
void *data)
1799 memb_state_consensus_timeout_expired (instance);
1802 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1807 unsigned int range = 0;
1820 for (i = 1; i <= range; i++) {
1826 recovery_message_item = ptr;
1831 mcast = recovery_message_item->
mcast;
1837 regular_message_item.
mcast =
1838 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1839 regular_message_item.
msg_len =
1840 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1841 mcast = regular_message_item.
mcast;
1850 "comparing if ring id is for this processors old ring seqno %d",
1864 ®ular_message_item, mcast->
seq);
1871 "-not adding msg with seq no %x", mcast->
seq);
1881 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1882 int joined_list_entries = 0;
1883 unsigned int aru_save;
1890 char left_node_msg[1024];
1891 char joined_node_msg[1024];
1892 char failed_node_msg[1024];
1896 memb_consensus_reset (instance);
1898 old_ring_state_reset (instance);
1900 deliver_messages_from_recovery_to_regular (instance);
1903 "Delivering to app %x to %x",
1906 aru_save = instance->
my_aru;
1919 memb_set_subtract (joined_list, &joined_list_entries,
1947 srp_addr_to_nodeid (trans_memb_list_totemip,
1960 instance->
my_aru = aru_save;
1970 joined_list, joined_list_entries,
1975 srp_addr_to_nodeid (new_memb_list_totemip,
1977 srp_addr_to_nodeid (joined_list_totemip, joined_list,
1978 joined_list_entries);
1982 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
2044 regular_message = ptr;
2045 free (regular_message->
mcast);
2051 if (joined_list_entries) {
2053 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
2054 for (i=0; i< joined_list_entries; i++) {
2055 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" %u", joined_list_totemip[i]);
2059 joined_node_msg[0] =
'\0';
2065 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
2067 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" %u", left_list[i]);
2070 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2072 sptr2 += snprintf(failed_node_msg,
sizeof(failed_node_msg)-sptr2,
" failed:");
2074 sptr2 += snprintf(failed_node_msg+sptr2,
sizeof(left_node_msg)-sptr2,
" %u", left_list[i]);
2078 failed_node_msg[0] =
'\0';
2082 left_node_msg[0] =
'\0';
2083 failed_node_msg[0] =
'\0';
2086 my_leave_memb_clear(instance);
2089 "entering OPERATIONAL state.");
2091 "A new membership (%s:%lld) was formed. Members%s%s",
2097 if (strlen(failed_node_msg)) {
2099 "Failed to receive the leave message.%s",
2110 reset_pause_timeout (instance);
2123 static void memb_state_gather_enter (
2132 &instance->
my_id, 1,
2135 memb_join_message_send (instance);
2146 memb_timer_function_state_gather,
2159 memb_timer_function_gather_consensus_timeout,
2165 cancel_token_retransmit_timeout (instance);
2166 cancel_token_timeout (instance);
2167 cancel_merge_detect_timeout (instance);
2169 memb_consensus_reset (instance);
2171 memb_consensus_set (instance, &instance->
my_id);
2174 "entering GATHER state from %d(%s).",
2175 gather_from, gsfrom_to_msg(gather_from));
2190 static void timer_function_token_retransmit_timeout (
void *data);
2192 static void target_set_completed (
2197 memb_state_commit_token_send (instance);
2201 static void memb_state_commit_enter (
2204 old_ring_state_save (instance);
2206 memb_state_commit_token_update (instance);
2208 memb_state_commit_token_target_set (instance);
2224 "entering COMMIT state.");
2227 reset_token_retransmit_timeout (instance);
2228 reset_token_timeout (instance);
2244 static void memb_state_recovery_enter (
2249 int local_received_flg = 1;
2250 unsigned int low_ring_aru;
2251 unsigned int range = 0;
2252 unsigned int messages_originated = 0;
2255 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2261 "entering RECOVERY state.");
2272 memb_state_commit_token_send_recovery (instance, commit_token);
2287 memcpy (&my_new_memb_ring_id_list[i],
2288 &memb_list[i].ring_id,
2291 memb_set_and_with_ring_id (
2293 my_new_memb_ring_id_list,
2307 "position [%d] member %s:", i,
totemip_print (&addr[i].addr[0]));
2309 "previous ring seq %llx rep %s",
2314 "aru %x high delivered %x received flag %d",
2332 local_received_flg = 0;
2336 if (local_received_flg == 1) {
2352 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2354 low_ring_aru = memb_list[i].
aru;
2375 "copying all old ring messages from %x-%x.",
2378 for (i = 1; i <= range; i++) {
2385 low_ring_aru + i, &ptr);
2389 sort_queue_item = ptr;
2390 messages_originated++;
2391 memset (&message_item, 0,
sizeof (
struct message_item));
2393 message_item.
mcast = totemsrp_buffer_alloc (instance);
2394 assert (message_item.
mcast);
2404 memcpy (((
char *)message_item.
mcast) + sizeof (
struct mcast),
2405 sort_queue_item->
mcast,
2410 "Originated %d messages in RECOVERY.", messages_originated);
2415 "Did not need to originate any messages in recovery.");
2425 reset_token_timeout (instance);
2426 reset_token_retransmit_timeout (instance);
2439 token_hold_cancel_send (instance);
2446 struct iovec *iovec,
2447 unsigned int iov_len,
2454 unsigned int addr_idx;
2463 if (cs_queue_is_full (queue_use)) {
2468 memset (&message_item, 0,
sizeof (
struct message_item));
2473 message_item.
mcast = totemsrp_buffer_alloc (instance);
2474 if (message_item.
mcast == 0) {
2481 memset(message_item.
mcast, 0, sizeof (
struct mcast));
2491 addr = (
char *)message_item.
mcast;
2492 addr_idx = sizeof (
struct mcast);
2493 for (i = 0; i < iov_len; i++) {
2494 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2495 addr_idx += iovec[i].iov_len;
2498 message_item.
msg_len = addr_idx;
2502 cs_queue_item_add (queue_use, &message_item);
2524 cs_queue_avail (queue_use, &avail);
2535 static int orf_token_remcast (
2543 struct sq *sort_queue;
2551 res = sq_in_range (sort_queue, seq);
2560 res = sq_item_get (sort_queue, seq, &ptr);
2565 sort_queue_item = ptr;
2569 sort_queue_item->
mcast,
2579 static void messages_free (
2581 unsigned int token_aru)
2586 int log_release = 0;
2587 unsigned int release_to;
2588 unsigned int range = 0;
2590 release_to = token_aru;
2591 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2611 for (i = 1; i <= range; i++) {
2617 regular_message = ptr;
2618 totemsrp_buffer_release (instance, regular_message->
mcast);
2629 "releasing messages up to and including %x", release_to);
2633 static void update_aru (
2638 struct sq *sort_queue;
2640 unsigned int my_aru_saved = 0;
2650 my_aru_saved = instance->
my_aru;
2651 for (i = 1; i <= range; i++) {
2655 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2663 instance->
my_aru += i - 1;
2669 static int orf_token_mcast (
2672 int fcc_mcasts_allowed)
2676 struct sq *sort_queue;
2679 unsigned int fcc_mcast_current;
2684 reset_token_retransmit_timeout (instance);
2695 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2696 if (cs_queue_is_empty (mcast_queue)) {
2699 message_item = (
struct message_item *)cs_queue_item_get (mcast_queue);
2707 memset (&sort_queue_item, 0,
sizeof (
struct sort_queue_item));
2711 mcast = sort_queue_item.
mcast;
2718 sq_item_add (sort_queue, &sort_queue_item, message_item->
mcast->
seq);
2722 message_item->
mcast,
2728 cs_queue_item_remove (mcast_queue);
2736 update_aru (instance);
2741 return (fcc_mcast_current);
2748 static int orf_token_rtr (
2751 unsigned int *fcc_allowed)
2756 struct sq *sort_queue;
2758 unsigned int range = 0;
2759 char retransmit_msg[1024];
2768 rtr_list = &orf_token->
rtr_list[0];
2770 strcpy (retransmit_msg,
"Retransmit List: ");
2775 sprintf (value,
"%x ", rtr_list[i].seq);
2776 strcat (retransmit_msg, value);
2778 strcat (retransmit_msg,
"");
2780 "%s", retransmit_msg);
2793 if (memcmp (&rtr_list[i].ring_id, &instance->
my_ring_id,
2800 res = orf_token_remcast (instance, rtr_list[i].seq);
2807 memmove (&rtr_list[i], &rtr_list[i + 1],
2823 range = orf_token->
seq - instance->
my_aru;
2827 (i <= range); i++) {
2832 res = sq_in_range (sort_queue, instance->
my_aru + i);
2840 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2851 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2861 if (instance->
my_aru + i == rtr_list[j].
seq) {
2891 static void timer_function_token_retransmit_timeout (
void *data)
2901 token_retransmit (instance);
2902 reset_token_retransmit_timeout (instance);
2907 static void timer_function_token_hold_retransmit_timeout (
void *data)
2918 token_retransmit (instance);
2923 static void timer_function_merge_detect_timeout(
void *data)
2932 memb_merge_detect_transmit (instance);
2945 static int token_send (
2947 struct orf_token *orf_token,
2951 unsigned int orf_token_size;
2953 orf_token_size =
sizeof (
struct orf_token) +
2954 (orf_token->rtr_list_entries *
sizeof (
struct rtr_item));
2961 if (forward_token == 0) {
2998 sizeof (
struct token_hold_cancel));
3005 struct orf_token orf_token;
3037 res = token_send (instance, &orf_token, 1);
3042 static void memb_state_commit_token_update (
3047 unsigned int high_aru;
3083 if (sq_lt_compare (high_aru, memb_list[i].
aru)) {
3084 high_aru = memb_list[i].
aru;
3094 if (sq_lt_compare (memb_list[i].
aru, high_aru)) {
3109 static void memb_state_commit_token_target_set (
3126 static int memb_state_commit_token_send_recovery (
3128 struct memb_commit_token *commit_token)
3130 unsigned int commit_token_size;
3134 commit_token_size =
sizeof (
struct memb_commit_token) +
3135 ((sizeof (struct srp_addr) +
3136 sizeof (struct memb_commit_token_memb_entry)) * commit_token->
addr_entries);
3152 reset_token_retransmit_timeout (instance);
3156 static int memb_state_commit_token_send (
3159 unsigned int commit_token_size;
3163 commit_token_size =
sizeof (
struct memb_commit_token) +
3164 ((sizeof (struct srp_addr) +
3181 reset_token_retransmit_timeout (instance);
3188 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3189 int token_memb_entries = 0;
3193 memb_set_subtract (token_memb, &token_memb_entries,
3201 lowest_addr = &token_memb[0].
addr[0];
3202 for (i = 1; i < token_memb_entries; i++) {
3210 static int srp_addr_compare (
const void *a,
const void *b)
3218 static void memb_state_commit_token_create (
3221 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3224 int token_memb_entries = 0;
3227 "Creating commit token because I am the rep.");
3229 memb_set_subtract (token_memb, &token_memb_entries,
3233 memset (instance->
commit_token, 0, sizeof (
struct memb_commit_token));
3248 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3257 memcpy (addr, token_memb,
3258 token_memb_entries *
sizeof (
struct srp_addr));
3259 memset (memb_list, 0,
3265 char memb_join_data[40000];
3268 unsigned int addr_idx;
3285 addr = (
char *)memb_join;
3286 addr_idx =
sizeof (
struct memb_join);
3287 memcpy (&addr[addr_idx],
3294 memcpy (&addr[addr_idx],
3317 char memb_join_data[40000];
3320 unsigned int addr_idx;
3321 int active_memb_entries;
3322 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3325 "sending join/leave message");
3332 &instance->
my_id, 1,
3335 memb_set_subtract (active_memb, &active_memb_entries,
3337 &instance->
my_id, 1);
3356 addr = (
char *)memb_join;
3357 addr_idx =
sizeof (
struct memb_join);
3358 memcpy (&addr[addr_idx],
3360 active_memb_entries *
3363 active_memb_entries *
3365 memcpy (&addr[addr_idx],
3401 sizeof (
struct memb_merge_detect));
3404 static void memb_ring_id_set (
3423 token_hold_cancel_send (instance);
3426 if (callback_handle == 0) {
3429 *handle_out = (
void *)callback_handle;
3430 list_init (&callback_handle->
list);
3432 callback_handle->
data = (
void *) data;
3434 callback_handle->
delete =
delete;
3453 list_del (&h->
list);
3460 static void token_callbacks_execute (
3466 struct list_head *callback_listhead = 0;
3482 for (list = callback_listhead->
next; list != callback_listhead;
3485 token_callback_instance =
list_entry (list,
struct token_callback_instance, list);
3487 list_next = list->
next;
3488 del = token_callback_instance->
delete;
3495 token_callback_instance->
data);
3499 if (res == -1 && del == 1) {
3500 list_add (list, callback_listhead);
3502 free (token_callback_instance);
3526 if (queue_use != NULL) {
3527 backlog = cs_queue_used (queue_use);
3534 static int fcc_calculate (
3536 struct orf_token *token)
3538 unsigned int transmits_allowed;
3539 unsigned int backlog_calc;
3547 instance->
my_cbl = backlog_get (instance);
3556 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3557 transmits_allowed = backlog_calc;
3561 return (transmits_allowed);
3567 static void fcc_rtr_limit (
3569 struct orf_token *token,
3570 unsigned int *transmits_allowed)
3574 assert (check >= 0);
3581 *transmits_allowed = 0;
3585 static void fcc_token_update (
3587 struct orf_token *token,
3588 unsigned int msgs_transmitted)
3590 token->
fcc += msgs_transmitted - instance->
my_trc;
3592 instance->
my_trc = msgs_transmitted;
3604 static int message_handler_orf_token (
3608 int endian_conversion_needed)
3610 char token_storage[1500];
3611 char token_convert[1500];
3612 struct orf_token *token = NULL;
3614 unsigned int transmits_allowed;
3615 unsigned int mcasted_retransmit;
3616 unsigned int mcasted_regular;
3617 unsigned int last_aru;
3620 unsigned long long tv_current;
3621 unsigned long long tv_diff;
3623 tv_current = qb_util_nano_current_get ();
3624 tv_diff = tv_current -
tv_old;
3625 tv_old = tv_current;
3628 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3634 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE 3635 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3640 if (endian_conversion_needed) {
3641 orf_token_endian_convert ((
struct orf_token *)msg,
3642 (
struct orf_token *)token_convert);
3643 msg = (
struct orf_token *)token_convert;
3650 token = (
struct orf_token *)token_storage;
3651 memcpy (token, msg,
sizeof (
struct orf_token));
3652 memcpy (&token->
rtr_list[0], (
char *)msg + sizeof (
struct orf_token),
3660 start_merge_detect_timeout (instance);
3663 cancel_merge_detect_timeout (instance);
3664 cancel_token_hold_retransmit_timeout (instance);
3670 #ifdef TEST_RECOVERY_MSG_COUNT 3711 messages_free (instance, token->
aru);
3730 reset_heartbeat_timeout(instance);
3733 cancel_heartbeat_timeout(instance);
3748 transmits_allowed = fcc_calculate (instance, token);
3749 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3757 fcc_rtr_limit (instance, token, &transmits_allowed);
3758 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3765 fcc_token_update (instance, token, mcasted_retransmit +
3768 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
3773 if (token->
aru == token->
seq) {
3779 if (token->
aru == last_aru && token->
aru_addr != 0) {
3794 "FAILED TO RECEIVE");
3798 memb_set_merge (&instance->
my_id, 1,
3825 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
3838 "install seq %x aru %x high seq received %x",
3856 "retrans flag count %x token aru %x install seq %x aru %x %x",
3860 memb_state_operational_enter (instance);
3867 token_send (instance, token, forward_token);
3870 tv_current = qb_util_nano_current_get ();
3871 tv_diff = tv_current -
tv_old;
3872 tv_old = tv_current;
3875 ((
float)tv_diff) / 1000000.0);
3878 messages_deliver_to_app (instance, 0,
3886 reset_token_timeout (instance);
3887 reset_token_retransmit_timeout (instance);
3891 start_token_hold_retransmit_timeout (instance);
3901 reset_heartbeat_timeout(instance);
3904 cancel_heartbeat_timeout(instance);
3910 static void messages_deliver_to_app (
3913 unsigned int end_point)
3918 struct mcast *mcast_in;
3919 struct mcast mcast_header;
3920 unsigned int range = 0;
3921 int endian_conversion_required;
3922 unsigned int my_high_delivered_stored = 0;
3938 for (i = 1; i <= range; i++) {
3946 my_high_delivered_stored + i);
3952 my_high_delivered_stored + i, &ptr);
3956 if (res != 0 && skip == 0) {
3967 sort_queue_item_p = ptr;
3969 mcast_in = sort_queue_item_p->
mcast;
3970 assert (mcast_in != (
struct mcast *)0xdeadbeef);
3972 endian_conversion_required = 0;
3974 endian_conversion_required = 1;
3975 mcast_endian_convert (mcast_in, &mcast_header);
3977 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
3998 "Delivering MCAST message with seq %x to pending delivery queue",
4006 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
4008 endian_conversion_required);
4015 static int message_handler_mcast (
4019 int endian_conversion_needed)
4022 struct sq *sort_queue;
4023 struct mcast mcast_header;
4026 if (endian_conversion_needed) {
4027 mcast_endian_convert (msg, &mcast_header);
4029 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
4040 #ifdef TEST_DROP_MCAST_PERCENTAGE 4041 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4061 if (!memb_set_subset (
4088 "Received ringid(%s:%lld) seq %x",
4098 sq_in_range (sort_queue, mcast_header.
seq) &&
4099 sq_item_inuse (sort_queue, mcast_header.
seq) == 0) {
4105 sort_queue_item.
mcast = totemsrp_buffer_alloc (instance);
4106 if (sort_queue_item.
mcast == NULL) {
4109 memcpy (sort_queue_item.
mcast, msg, msg_len);
4110 sort_queue_item.
msg_len = msg_len;
4113 mcast_header.
seq)) {
4117 sq_item_add (sort_queue, &sort_queue_item, mcast_header.
seq);
4120 update_aru (instance);
4129 static int message_handler_memb_merge_detect (
4133 int endian_conversion_needed)
4138 if (endian_conversion_needed) {
4139 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4141 memcpy (&memb_merge_detect, msg,
4142 sizeof (
struct memb_merge_detect));
4159 memb_set_merge (&memb_merge_detect.
system_from, 1,
4165 if (!memb_set_subset (
4171 memb_set_merge (&memb_merge_detect.
system_from, 1,
4189 static void memb_join_process (
4195 int gather_entered = 0;
4196 int fail_minus_memb_entries = 0;
4197 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4213 "Discarding LEAVE message during flush, nodeid=%u",
4220 "Discarding JOIN message during flush, nodeid=%d", memb_join->
header.
nodeid);
4235 if (memb_set_equal (proc_list,
4240 memb_set_equal (failed_list,
4245 memb_consensus_set (instance, &memb_join->
system_from);
4247 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4254 memb_state_commit_token_create (instance);
4256 memb_state_commit_enter (instance);
4259 if (memb_consensus_agreed (instance) &&
4260 memb_lowest_in_config (instance)) {
4262 memb_state_commit_token_create (instance);
4264 memb_state_commit_enter (instance);
4269 if (memb_set_subset (proc_list,
4274 memb_set_subset (failed_list,
4286 memb_set_merge (proc_list,
4290 if (memb_set_subset (
4291 &instance->
my_id, 1,
4298 if (memb_set_subset (
4303 if (memb_set_subset (
4308 memb_set_merge (failed_list,
4312 memb_set_subtract (fail_minus_memb,
4313 &fail_minus_memb_entries,
4319 memb_set_merge (fail_minus_memb,
4320 fail_minus_memb_entries,
4331 if (gather_entered == 0 &&
4338 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4360 srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4363 srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4367 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out)
4388 srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4393 if (in_memb_list[i].ring_id.
rep.
family != 0) {
4406 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4430 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4446 static void memb_merge_detect_endian_convert (
4458 static int ignore_join_under_operational (
4460 const struct memb_join *memb_join)
4470 if (memb_set_subset (&instance->
my_id, 1,
4488 static int message_handler_memb_join (
4492 int endian_conversion_needed)
4494 const struct memb_join *memb_join;
4495 struct memb_join *memb_join_convert = alloca (msg_len);
4497 if (endian_conversion_needed) {
4498 memb_join = memb_join_convert;
4499 memb_join_endian_convert (msg, memb_join_convert);
4509 if (pause_flush (instance)) {
4518 if (!ignore_join_under_operational (instance, memb_join)) {
4519 memb_join_process (instance, memb_join);
4524 memb_join_process (instance, memb_join);
4535 memb_join_process (instance, memb_join);
4548 memb_join_process (instance, memb_join);
4549 memb_recovery_state_token_loss (instance);
4557 static int message_handler_memb_commit_token (
4561 int endian_conversion_needed)
4563 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4564 struct memb_commit_token *memb_commit_token;
4565 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4571 "got commit token");
4573 if (endian_conversion_needed) {
4574 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4576 memcpy (memb_commit_token_convert, msg, msg_len);
4578 memb_commit_token = memb_commit_token_convert;
4581 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4582 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4592 memb_set_subtract (sub, &sub_entries,
4596 if (memb_set_equal (addr,
4602 memcpy (instance->
commit_token, memb_commit_token, msg_len);
4603 memb_state_commit_enter (instance);
4617 memb_state_recovery_enter (instance, memb_commit_token);
4632 "Sending initial ORF token");
4635 orf_token_send_initial (instance);
4636 reset_token_timeout (instance);
4637 reset_token_retransmit_timeout (instance);
4644 static int message_handler_token_hold_cancel (
4648 int endian_conversion_needed)
4657 timer_function_token_retransmit_timeout (instance);
4666 unsigned int msg_len)
4671 if (msg_len <
sizeof (
struct message_header)) {
4673 "Received message is too short... ignoring %u.",
4674 (
unsigned int)msg_len);
4679 switch (message_header->
type) {
4700 printf (
"wrong message type\n");
4717 unsigned int iface_no)
4733 "Created or loaded sequence id %llx.%s for this ring.",
4760 void (*totem_service_ready) (
void))
void(* totemsrp_service_ready_fn)(void)
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
void(*) enum memb_stat memb_state)
int totemrrp_iface_check(void *rrp_context)
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
struct srp_addr system_from
struct memb_ring_id ring_id
uint32_t waiting_trans_ack
struct srp_addr system_from
struct memb_ring_id ring_id
int totemsrp_log_level_debug
struct memb_ring_id my_ring_id
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
int my_leave_memb_entries
struct message_header header
unsigned int old_ring_state_high_seq_received
unsigned int proc_list_entries
struct totem_interface * interfaces
unsigned int interface_count
int totemsrp_my_family_get(void *srp_context)
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
const char * totemip_print(const struct totem_ip_address *addr)
int totemsrp_log_level_error
#define LEAVE_DUMMY_NODEID
struct memb_ring_id ring_id
qb_loop_timer_handle timer_heartbeat_timeout
unsigned int failed_list_entries
unsigned char end_of_memb_join[0]
unsigned long long int tv_old
#define SEQNO_START_TOKEN
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int token_hold_timeout
struct memb_ring_id ring_id
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
int totemip_compare(const void *a, const void *b)
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int ring_no)
void * token_sent_event_handle
struct srp_addr system_from
int totemsrp_log_level_notice
unsigned int totemsrp_my_nodeid_get(void *srp_context)
char rrp_mode[TOTEM_RRP_MODE_BYTES]
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
int totemsrp_log_level_warning
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
void totemrrp_membership_changed(void *rrp_context, enum totem_configuration_type configuration_type, const struct srp_addr *member_list, size_t member_list_entries, const struct srp_addr *left_list, size_t left_list_entries, const struct srp_addr *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
struct message_header header
uint64_t memb_merge_detect_rx
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
struct cs_queue new_message_queue_trans
struct message_header header
unsigned char end_of_commit_token[0]
char commit_token_storage[40000]
unsigned int rrp_problem_count_timeout
struct list_head token_callback_sent_listhead
struct cs_queue new_message_queue
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
uint64_t gather_token_lost
int totemsrp_log_level_trace
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
int totemrrp_ifaces_get(void *rrp_context, char ***status, unsigned int *iface_count)
struct memb_ring_id my_old_ring_id
void * totemrrp_buffer_alloc(void *rrp_context)
unsigned int downcheck_timeout
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
uint64_t memb_commit_token_tx
int my_deliver_memb_entries
unsigned int max_network_delay
unsigned int heartbeat_failures_allowed
#define TOTEM_TOKEN_STATS_MAX
struct message_item __attribute__
unsigned long long token_ring_id_seq
struct totem_ip_address mcast_address
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
unsigned int send_join_timeout
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
unsigned int rrp_problem_count_threshold
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
uint64_t operational_entered
void(*) in log_level_security)
unsigned long long ring_seq
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
uint64_t operational_token_lost
unsigned int received_flg
uint64_t consensus_timeouts
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
struct message_handlers totemsrp_message_handlers
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
uint64_t recovery_token_lost
unsigned char end_of_memb_join[0]
unsigned int token_retransmits_before_loss_const
struct message_header header
int totemrrp_finalize(void *rrp_context)
struct list_head token_callback_received_listhead
int totemrrp_member_remove(void *rrp_context, const struct totem_ip_address *member, int iface_no)
struct rtr_item rtr_list[0]
int totemsrp_ring_reenable(void *srp_context)
struct memb_ring_id ring_id
unsigned int seqno_unchanged_const
uint64_t commit_token_lost
unsigned int miss_count_const
int totemrrp_crypto_set(void *rrp_context, const char *cipher_type, const char *hash_type)
uint64_t token_hold_cancel_rx
unsigned int join_timeout
uint32_t originated_orf_token
int totemrrp_send_flush(void *rrp_context)
struct message_header header
struct totem_ip_address mcast_addr
#define MESSAGE_QUEUE_MAX
int totemrrp_member_add(void *rrp_context, const struct totem_ip_address *member, int iface_no)
unsigned int received_flg
struct totem_ip_address rep
unsigned int last_released
int orf_token_retransmit_size
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
unsigned int rrp_autorecovery_check_timeout
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int fail_to_recv_const
void * token_recv_event_handle
struct totem_ip_address boundto
unsigned int my_high_seq_received
int totemrrp_initialize(qb_loop_t *poll_handle, void **rrp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no), void(*token_seqid_get)(const void *msg, unsigned int *seqid, unsigned int *token_is), unsigned int(*msgs_missing)(void), void(*target_set_completed)(void *context))
Create an instance.
qb_loop_t * totemsrp_poll_handle
qb_loop_timer_handle timer_pause_timeout
qb_loop_timer_handle timer_merge_detect_timeout
int my_merge_detect_timeout_outstanding
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
int totemsrp_log_level_security
qb_loop_timer_handle timer_orf_token_retransmit_timeout
struct totem_config * totem_config
int(* callback_fn)(enum totem_callback_token_type type, const void *)
qb_loop_timer_handle timer_orf_token_timeout
uint32_t continuous_gather
void totemsrp_threaded_mode_enable(void *context)
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totemmrp_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
int totemrrp_recv_flush(void *rrp_context)
uint32_t orf_token_discard
int my_failed_list_entries
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
uint64_t token_hold_cancel_tx
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int token_timeout
unsigned int high_delivered
unsigned int consensus_timeout
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len)
#define PROCESSOR_COUNT_MAX
unsigned short endian_detector
void totemrrp_buffer_release(void *rrp_context, void *ptr)
Totem Network interface - also does encryption/decryption.
char orf_token_retransmit[TOKEN_SIZE_MAX]
struct message_header header
struct sq regular_sort_queue
void totemsrp_finalize(void *srp_context)
#define QUEUE_RTR_ITEMS_SIZE_MAX
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
struct cs_queue retrans_message_queue
const char * gather_state_from_desc[]
qb_loop_timer_handle memb_timer_state_gather_join_timeout
int my_trans_memb_entries
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
uint64_t memb_merge_detect_tx
unsigned int high_delivered
struct rtr_item rtr_list[0]
int consensus_list_entries
unsigned int rrp_problem_count_mcast_threshold
int totemrrp_processor_count_set(void *rrp_context, unsigned int processor_count)
int totemrrp_mcast_noflush_send(void *rrp_context, const void *msg, unsigned int msg_len)
unsigned char end_of_commit_token[0]
uint32_t threaded_mode_enabled
enum totem_callback_token_type callback_type
int totemrrp_mcast_recv_empty(void *rrp_context)
#define list_entry(ptr, type, member)
unsigned long long ring_seq
struct totem_logging_configuration totem_logging_configuration
int totemrrp_mcast_flush_send(void *rrp_context, const void *msg, unsigned int msg_len)
struct memb_ring_id ring_id
#define log_printf(level, format, args...)
void totemsrp_trans_ack(void *context)
unsigned int max_messages
uint64_t recovery_entered
qb_loop_timer_handle memb_timer_state_commit_timeout
struct memb_commit_token * commit_token
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
struct srp_addr system_from
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
unsigned int merge_timeout
unsigned int use_heartbeat
struct message_header header
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int ring_no)
unsigned int token_retransmit_timeout
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
#define RETRANSMIT_ENTRIES_MAX
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
unsigned int my_token_seq
struct memb_ring_id ring_id
int totemrrp_ring_reenable(void *rrp_context, unsigned int iface_no)
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
struct totem_ip_address addr[INTERFACE_MAX]
unsigned int rrp_token_expired_timeout
struct memb_ring_id ring_id
unsigned int my_install_seq
int totemrrp_token_send(void *rrp_context, const void *msg, unsigned int msg_len)
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
struct sq recovery_sort_queue
int totemrrp_token_target_set(void *rrp_context, struct totem_ip_address *addr, unsigned int iface_no)
totem_callback_token_type
unsigned int my_high_ring_delivered