55 #include <sys/types.h>
57 #include <sys/socket.h>
60 #include <sys/ioctl.h>
61 #include <sys/param.h>
62 #include <netinet/in.h>
63 #include <arpa/inet.h>
76 #include <qb/qblist.h>
77 #include <qb/qbdefs.h>
78 #include <qb/qbutil.h>
79 #include <qb/qbloop.h>
84 #define LOGSYS_UTILS_ONLY 1
92 #define LOCALHOST_IP inet_addr("127.0.0.1")
93 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384
94 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384
95 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500
97 #define RETRANSMIT_ENTRIES_MAX 30
98 #define TOKEN_SIZE_MAX 64000
99 #define LEAVE_DUMMY_NODEID 0
118 #define SEQNO_START_MSG 0x0
119 #define SEQNO_START_TOKEN 0x0
141 #define ENDIAN_LOCAL 0xff22
438 const char *
function,
441 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
454 unsigned int msg_len,
455 int endian_conversion_required);
459 const unsigned int *member_list,
size_t member_list_entries,
460 const unsigned int *left_list,
size_t left_list_entries,
461 const unsigned int *joined_list,
size_t joined_list_entries,
536 int endian_conversion_needed);
581 static int message_handler_orf_token (
585 int endian_conversion_needed);
587 static int message_handler_mcast (
591 int endian_conversion_needed);
593 static int message_handler_memb_merge_detect (
597 int endian_conversion_needed);
599 static int message_handler_memb_join (
603 int endian_conversion_needed);
605 static int message_handler_memb_commit_token (
609 int endian_conversion_needed);
611 static int message_handler_token_hold_cancel (
615 int endian_conversion_needed);
619 static void srp_addr_to_nodeid (
621 unsigned int *nodeid_out,
623 unsigned int entries);
625 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
631 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
633 int fcc_mcasts_allowed);
634 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
638 static void target_set_completed (
void *context);
640 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
645 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
647 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
648 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
649 static void memb_merge_detect_endian_convert (
653 static void timer_function_orf_token_timeout (
void *data);
654 static void timer_function_orf_token_warning (
void *data);
655 static void timer_function_pause_timeout (
void *data);
656 static void timer_function_heartbeat_timeout (
void *data);
657 static void timer_function_token_retransmit_timeout (
void *data);
658 static void timer_function_token_hold_retransmit_timeout (
void *data);
659 static void timer_function_merge_detect_timeout (
void *data);
661 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
667 unsigned int msg_len,
673 unsigned int iface_no);
678 message_handler_orf_token,
679 message_handler_mcast,
680 message_handler_memb_merge_detect,
681 message_handler_memb_join,
682 message_handler_memb_commit_token,
683 message_handler_token_hold_cancel
687 #define log_printf(level, format, args...) \
689 instance->totemsrp_log_printf ( \
690 level, instance->totemsrp_subsys_id, \
691 __FUNCTION__, __FILE__, __LINE__, \
694 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
696 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
697 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
698 instance->totemsrp_log_printf ( \
699 level, instance->totemsrp_subsys_id, \
700 __FUNCTION__, __FILE__, __LINE__, \
701 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
748 uint64_t timestamp_msec;
751 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
756 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
771 unsigned long long nano_secs = qb_util_nano_current_get ();
773 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
802 static void totempg_mtu_changed(
void *context,
int net_mtu)
809 "Net MTU changed to %d, new value is %d",
817 qb_loop_t *poll_handle,
825 unsigned int msg_len,
826 int endian_conversion_required),
830 const unsigned int *member_list,
size_t member_list_entries,
831 const unsigned int *left_list,
size_t left_list_entries,
832 const unsigned int *joined_list,
size_t joined_list_entries,
834 void (*waiting_trans_ack_cb_fn) (
835 int waiting_trans_ack))
841 if (instance == NULL) {
845 totemsrp_instance_initialize (instance);
883 "Token Timeout (%d ms) retransmit timeout (%d ms)",
888 "Token warning every %d ms (%d%% of Token Timeout)",
890 if (token_warning_ms < totem_config->token_retransmit_timeout)
892 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
893 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
897 "Token warnings disabled");
900 "token hold (%d ms) retransmits before loss (%d retrans)",
903 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
910 "downcheck (%d ms) fail to recv const (%d msgs)",
916 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
920 "missed count const (%d messages)",
948 timer_function_pause_timeout (instance);
952 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
963 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
967 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
969 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
987 target_set_completed);
1009 token_event_stats_collector,
1015 token_event_stats_collector,
1017 *srp_context = instance;
1029 memb_leave_message_send (instance);
1050 unsigned int *interface_id,
1052 unsigned int interfaces_size,
1054 unsigned int *iface_count)
1070 interface_id[num_ifs] = i;
1072 if (++num_ifs > interfaces_size) {
1081 *iface_count = num_ifs;
1087 const char *cipher_type,
1088 const char *hash_type)
1125 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1133 static void srp_addr_to_nodeid (
1135 unsigned int *nodeid_out,
1137 unsigned int entries)
1141 for (i = 0; i < entries; i++) {
1142 nodeid_out[i] = srp_addr_in[i].
nodeid;
1160 static void memb_set_subtract (
1161 struct srp_addr *out_list,
int *out_list_entries,
1162 struct srp_addr *one_list,
int one_list_entries,
1163 struct srp_addr *two_list,
int two_list_entries)
1169 *out_list_entries = 0;
1171 for (i = 0; i < one_list_entries; i++) {
1172 for (j = 0; j < two_list_entries; j++) {
1173 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1179 out_list[*out_list_entries] = one_list[i];
1180 *out_list_entries = *out_list_entries + 1;
1189 static void memb_consensus_set (
1213 static int memb_consensus_isset (
1230 static int memb_consensus_agreed (
1234 int token_memb_entries = 0;
1238 memb_set_subtract (token_memb, &token_memb_entries,
1242 for (i = 0; i < token_memb_entries; i++) {
1243 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1258 assert (token_memb_entries >= 1);
1263 static void memb_consensus_notset (
1265 struct srp_addr *no_consensus_list,
1266 int *no_consensus_list_entries,
1268 int comparison_list_entries)
1272 *no_consensus_list_entries = 0;
1275 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1276 no_consensus_list[*no_consensus_list_entries] = instance->
my_proc_list[i];
1277 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1285 static int memb_set_equal (
1286 struct srp_addr *set1,
int set1_entries,
1287 struct srp_addr *set2,
int set2_entries)
1294 if (set1_entries != set2_entries) {
1297 for (i = 0; i < set2_entries; i++) {
1298 for (j = 0; j < set1_entries; j++) {
1299 if (srp_addr_equal (&set1[j], &set2[i])) {
1315 static int memb_set_subset (
1316 const struct srp_addr *subset,
int subset_entries,
1317 const struct srp_addr *fullset,
int fullset_entries)
1323 if (subset_entries > fullset_entries) {
1326 for (i = 0; i < subset_entries; i++) {
1327 for (j = 0; j < fullset_entries; j++) {
1328 if (srp_addr_equal (&subset[i], &fullset[j])) {
1342 static void memb_set_merge (
1343 const struct srp_addr *subset,
int subset_entries,
1344 struct srp_addr *fullset,
int *fullset_entries)
1350 for (i = 0; i < subset_entries; i++) {
1351 for (j = 0; j < *fullset_entries; j++) {
1352 if (srp_addr_equal (&fullset[j], &subset[i])) {
1358 fullset[*fullset_entries] = subset[i];
1359 *fullset_entries = *fullset_entries + 1;
1366 static void memb_set_and_with_ring_id (
1382 for (i = 0; i < set2_entries; i++) {
1383 for (j = 0; j < set1_entries; j++) {
1384 if (srp_addr_equal (&set1[j], &set2[i])) {
1385 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1392 and[*and_entries] = set1[j];
1393 *and_entries = *and_entries + 1;
1400 static void memb_set_log(
1411 memset(list_str, 0,
sizeof(list_str));
1413 for (i = 0; i < list_entries; i++) {
1420 if (strlen(list_str) + strlen(int_buf) >=
sizeof(list_str)) {
1423 strcat(list_str, int_buf);
1426 log_printf(level,
"List '%s' contains %d entries: %s",
string, list_entries, list_str);
1429 static void my_leave_memb_clear(
1436 static unsigned int my_leave_memb_match(
1441 unsigned int ret = 0;
1452 static void my_leave_memb_set(
1478 assert (instance != NULL);
1482 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1484 assert (instance != NULL);
1498 timer_function_token_retransmit_timeout,
1499 &instance->timer_orf_token_retransmit_timeout);
1515 timer_function_merge_detect_timeout,
1516 &instance->timer_merge_detect_timeout);
1544 "Saving state aru %x high seq received %x",
1554 "Restoring instance->my_aru %x my high seq received %x",
1561 "Resetting old ring state");
1574 timer_function_pause_timeout,
1575 &instance->timer_pause_timeout);
1589 timer_function_orf_token_warning,
1590 &instance->timer_orf_token_warning);
1604 timer_function_orf_token_timeout,
1605 &instance->timer_orf_token_timeout);
1611 reset_token_warning(instance);
1622 timer_function_heartbeat_timeout,
1623 &instance->timer_heartbeat_timeout);
1638 cancel_token_warning(instance);
1645 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1650 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1658 timer_function_token_hold_retransmit_timeout,
1659 &instance->timer_orf_token_hold_retransmit_timeout);
1665 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1671 static void memb_state_consensus_timeout_expired (
1675 int no_consensus_list_entries;
1678 if (memb_consensus_agreed (instance)) {
1679 memb_consensus_reset (instance);
1681 memb_consensus_set (instance, &instance->
my_id);
1683 reset_token_timeout (instance);
1685 memb_consensus_notset (
1688 &no_consensus_list_entries,
1692 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1705 static void timer_function_pause_timeout (
void *data)
1710 reset_pause_timeout (instance);
1715 old_ring_state_restore (instance);
1720 static void timer_function_orf_token_warning (
void *data)
1727 tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1730 "Token has not been received in %d ms ", (
unsigned int) tv_diff);
1731 reset_token_warning(instance);
1733 cancel_token_warning(instance);
1737 static void timer_function_orf_token_timeout (
void *data)
1744 "The token was lost in the OPERATIONAL state.");
1746 "A processor failed, forming new configuration.");
1754 "The consensus timeout expired.");
1755 memb_state_consensus_timeout_expired (instance);
1762 "The token was lost in the COMMIT state.");
1769 "The token was lost in the RECOVERY state.");
1770 memb_recovery_state_token_loss (instance);
1776 static void timer_function_heartbeat_timeout (
void *data)
1780 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1781 timer_function_orf_token_timeout(data);
1784 static void memb_timer_function_state_gather (
void *data)
1796 memb_join_message_send (instance);
1807 memb_timer_function_state_gather,
1808 &instance->memb_timer_state_gather_join_timeout);
1817 static void memb_timer_function_gather_consensus_timeout (
void *data)
1820 memb_state_consensus_timeout_expired (instance);
1823 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1828 unsigned int range = 0;
1841 for (i = 1; i <= range; i++) {
1847 recovery_message_item = ptr;
1858 regular_message_item.mcast =
1859 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1860 regular_message_item.msg_len =
1861 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1862 mcast = regular_message_item.mcast;
1885 ®ular_message_item,
mcast->
seq);
1903 int joined_list_entries = 0;
1904 unsigned int aru_save;
1911 char left_node_msg[1024];
1912 char joined_node_msg[1024];
1913 char failed_node_msg[1024];
1917 memb_consensus_reset (instance);
1919 old_ring_state_reset (instance);
1921 deliver_messages_from_recovery_to_regular (instance);
1924 "Delivering to app %x to %x",
1927 aru_save = instance->
my_aru;
1940 memb_set_subtract (joined_list, &joined_list_entries,
1958 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1971 instance->
my_aru = aru_save;
1976 srp_addr_to_nodeid (instance, new_memb_list_totemip,
1978 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
1979 joined_list_entries);
1983 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
2045 regular_message = ptr;
2046 free (regular_message->
mcast);
2052 if (joined_list_entries) {
2054 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
2055 for (i=0; i< joined_list_entries; i++) {
2056 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" " CS_PRI_NODE_ID, joined_list_totemip[i]);
2060 joined_node_msg[0] =
'\0';
2066 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
2068 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" " CS_PRI_NODE_ID, left_list[i]);
2071 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2073 sptr2 += snprintf(failed_node_msg,
sizeof(failed_node_msg)-sptr2,
" failed:");
2075 sptr2 += snprintf(failed_node_msg+sptr2,
sizeof(left_node_msg)-sptr2,
" " CS_PRI_NODE_ID, left_list[i]);
2079 failed_node_msg[0] =
'\0';
2083 left_node_msg[0] =
'\0';
2084 failed_node_msg[0] =
'\0';
2087 my_leave_memb_clear(instance);
2090 "entering OPERATIONAL state.");
2098 if (strlen(failed_node_msg)) {
2100 "Failed to receive the leave message.%s",
2111 reset_pause_timeout (instance);
2124 static void memb_state_gather_enter (
2135 &instance->
my_id, 1,
2138 memb_join_message_send (instance);
2149 memb_timer_function_state_gather,
2150 &instance->memb_timer_state_gather_join_timeout);
2165 memb_timer_function_gather_consensus_timeout,
2166 &instance->memb_timer_state_gather_consensus_timeout);
2174 cancel_token_retransmit_timeout (instance);
2175 cancel_token_timeout (instance);
2176 cancel_merge_detect_timeout (instance);
2178 memb_consensus_reset (instance);
2180 memb_consensus_set (instance, &instance->
my_id);
2183 "entering GATHER state from %d(%s).",
2184 gather_from, gsfrom_to_msg(gather_from));
2199 static void timer_function_token_retransmit_timeout (
void *data);
2201 static void target_set_completed (
2206 memb_state_commit_token_send (instance);
2210 static void memb_state_commit_enter (
2213 old_ring_state_save (instance);
2215 memb_state_commit_token_update (instance);
2217 memb_state_commit_token_target_set (instance);
2234 "entering COMMIT state.");
2237 reset_token_retransmit_timeout (instance);
2238 reset_token_timeout (instance);
2254 static void memb_state_recovery_enter (
2259 int local_received_flg = 1;
2260 unsigned int low_ring_aru;
2261 unsigned int range = 0;
2262 unsigned int messages_originated = 0;
2271 "entering RECOVERY state.");
2282 memb_state_commit_token_send_recovery (instance, commit_token);
2297 memcpy (&my_new_memb_ring_id_list[i],
2301 memb_set_and_with_ring_id (
2303 my_new_memb_ring_id_list,
2320 memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2323 "aru %x high delivered %x received flag %d",
2325 memb_list[i].high_delivered,
2326 memb_list[i].received_flg);
2337 memb_list[i].received_flg == 0) {
2341 local_received_flg = 0;
2345 if (local_received_flg == 1) {
2358 &memb_list[i].ring_id,
2361 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2363 low_ring_aru = memb_list[i].
aru;
2384 "copying all old ring messages from %x-%x.",
2387 for (i = 1; i <= range; i++) {
2394 low_ring_aru + i, &ptr);
2399 messages_originated++;
2422 "Originated %d messages in RECOVERY.", messages_originated);
2427 "Did not need to originate any messages in recovery.");
2437 reset_token_timeout (instance);
2438 reset_token_retransmit_timeout (instance);
2451 token_hold_cancel_send (instance);
2458 struct iovec *iovec,
2459 unsigned int iov_len,
2466 unsigned int addr_idx;
2475 if (cs_queue_is_full (queue_use)) {
2506 addr_idx = sizeof (
struct mcast);
2507 for (i = 0; i < iov_len; i++) {
2508 memcpy (&
addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2509 addr_idx += iovec[i].iov_len;
2538 cs_queue_avail (queue_use, &avail);
2549 static int orf_token_remcast (
2557 struct sq *sort_queue;
2565 res = sq_in_range (sort_queue,
seq);
2574 res = sq_item_get (sort_queue,
seq, &ptr);
2593 static void messages_free (
2595 unsigned int token_aru)
2600 int log_release = 0;
2601 unsigned int release_to;
2602 unsigned int range = 0;
2604 release_to = token_aru;
2605 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2625 for (i = 1; i <= range; i++) {
2631 regular_message = ptr;
2632 totemsrp_buffer_release (instance, regular_message->
mcast);
2643 "releasing messages up to and including %x", release_to);
2647 static void update_aru (
2652 struct sq *sort_queue;
2654 unsigned int my_aru_saved = 0;
2664 my_aru_saved = instance->
my_aru;
2665 for (i = 1; i <= range; i++) {
2669 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2677 instance->
my_aru += i - 1;
2683 static int orf_token_mcast (
2686 int fcc_mcasts_allowed)
2690 struct sq *sort_queue;
2693 unsigned int fcc_mcast_current;
2698 reset_token_retransmit_timeout (instance);
2709 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2710 if (cs_queue_is_empty (mcast_queue)) {
2742 cs_queue_item_remove (mcast_queue);
2750 update_aru (instance);
2755 return (fcc_mcast_current);
2762 static int orf_token_rtr (
2765 unsigned int *fcc_allowed)
2770 struct sq *sort_queue;
2772 unsigned int range = 0;
2773 char retransmit_msg[1024];
2784 strcpy (retransmit_msg,
"Retransmit List: ");
2790 strcat (retransmit_msg,
value);
2792 strcat (retransmit_msg,
"");
2794 "%s", retransmit_msg);
2814 res = orf_token_remcast (instance,
rtr_list[i].
seq);
2841 (i <= range); i++) {
2846 res = sq_in_range (sort_queue, instance->
my_aru + i);
2854 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2865 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2905 static void timer_function_token_retransmit_timeout (
void *data)
2915 token_retransmit (instance);
2916 reset_token_retransmit_timeout (instance);
2921 static void timer_function_token_hold_retransmit_timeout (
void *data)
2932 token_retransmit (instance);
2937 static void timer_function_merge_detect_timeout(
void *data)
2946 memb_merge_detect_transmit (instance);
2959 static int token_send (
2965 unsigned int orf_token_size;
2967 orf_token_size =
sizeof (
struct orf_token) +
2975 if (forward_token == 0) {
3053 res = token_send (instance, &
orf_token, 1);
3058 static void memb_state_commit_token_update (
3063 unsigned int high_aru;
3096 &memb_list[i].ring_id,
3099 if (sq_lt_compare (high_aru, memb_list[i].
aru)) {
3100 high_aru = memb_list[i].
aru;
3107 &memb_list[i].ring_id,
3110 if (sq_lt_compare (memb_list[i].
aru, high_aru)) {
3125 static void memb_state_commit_token_target_set (
3139 static int memb_state_commit_token_send_recovery (
3143 unsigned int commit_token_size;
3165 reset_token_retransmit_timeout (instance);
3169 static int memb_state_commit_token_send (
3172 unsigned int commit_token_size;
3194 reset_token_retransmit_timeout (instance);
3202 int token_memb_entries = 0;
3204 unsigned int lowest_nodeid;
3206 memb_set_subtract (token_memb, &token_memb_entries,
3213 assert(token_memb_entries > 0);
3215 lowest_nodeid = token_memb[0].nodeid;
3216 for (i = 1; i < token_memb_entries; i++) {
3217 if (lowest_nodeid > token_memb[i].
nodeid) {
3218 lowest_nodeid = token_memb[i].nodeid;
3224 static int srp_addr_compare (
const void *a,
const void *b)
3238 static void memb_state_commit_token_create (
3244 int token_memb_entries = 0;
3247 "Creating commit token because I am the rep.");
3249 memb_set_subtract (token_memb, &token_memb_entries,
3268 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3277 memcpy (
addr, token_memb,
3278 token_memb_entries *
sizeof (
struct srp_addr));
3279 memset (memb_list, 0,
3285 char memb_join_data[40000];
3288 unsigned int addr_idx;
3299 ((instance->my_proc_list_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3301 if (msg_len >
sizeof(memb_join_data)) {
3303 "memb_join_message too long. Ignoring message.");
3319 memcpy (&
addr[addr_idx],
3326 memcpy (&
addr[addr_idx],
3348 char memb_join_data[40000];
3351 unsigned int addr_idx;
3352 int active_memb_entries;
3357 "sending join/leave message");
3364 &instance->
my_id, 1,
3367 memb_set_subtract (active_memb, &active_memb_entries,
3369 &instance->
my_id, 1);
3372 ((active_memb_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3374 if (msg_len >
sizeof(memb_join_data)) {
3376 "memb_leave message too long. Ignoring message.");
3399 memcpy (&
addr[addr_idx],
3401 active_memb_entries *
3404 active_memb_entries *
3406 memcpy (&
addr[addr_idx],
3446 static void memb_ring_id_set (
3465 token_hold_cancel_send (instance);
3468 if (callback_handle == 0) {
3471 *handle_out = (
void *)callback_handle;
3472 qb_list_init (&callback_handle->
list);
3474 callback_handle->
data = (
void *)
data;
3476 callback_handle->
delete =
delete;
3495 qb_list_del (&h->
list);
3502 static void token_callbacks_execute (
3506 struct qb_list_head *list, *tmp_iter;
3507 struct qb_list_head *callback_listhead = 0;
3523 qb_list_for_each_safe(
list, tmp_iter, callback_listhead) {
3536 if (res == -1 && del == 1) {
3537 qb_list_add (
list, callback_listhead);
3563 if (queue_use != NULL) {
3564 backlog = cs_queue_used (queue_use);
3571 static int fcc_calculate (
3575 unsigned int transmits_allowed;
3576 unsigned int backlog_calc;
3584 instance->
my_cbl = backlog_get (instance);
3593 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3594 transmits_allowed = backlog_calc;
3598 return (transmits_allowed);
3604 static void fcc_rtr_limit (
3607 unsigned int *transmits_allowed)
3611 assert (check >= 0);
3618 *transmits_allowed = 0;
3622 static void fcc_token_update (
3625 unsigned int msgs_transmitted)
3627 token->
fcc += msgs_transmitted - instance->
my_trc;
3629 instance->
my_trc = msgs_transmitted;
3636 static int check_orf_token_sanity(
3641 int endian_conversion_needed)
3645 size_t required_len;
3647 if (msg_len > max_msg_len) {
3649 "Received orf_token message is too long... ignoring.");
3654 if (msg_len <
sizeof(
struct orf_token)) {
3656 "Received orf_token message is too short... ignoring.");
3661 if (endian_conversion_needed) {
3669 "Received orf_token message rtr_entries is corrupted... ignoring.");
3674 required_len =
sizeof(
struct orf_token) + rtr_entries *
sizeof(
struct rtr_item);
3675 if (msg_len < required_len) {
3677 "Received orf_token message is too short... ignoring.");
3685 static int check_mcast_sanity(
3689 int endian_conversion_needed)
3692 if (msg_len <
sizeof(
struct mcast)) {
3694 "Received mcast message is too short... ignoring.");
3702 static int check_memb_merge_detect_sanity(
3706 int endian_conversion_needed)
3711 "Received memb_merge_detect message is too short... ignoring.");
3719 static int check_memb_join_sanity(
3723 int endian_conversion_needed)
3728 size_t required_len;
3730 if (msg_len <
sizeof(
struct memb_join)) {
3732 "Received memb_join message is too short... ignoring.");
3740 if (endian_conversion_needed) {
3746 if (msg_len < required_len) {
3748 "Received memb_join message is too short... ignoring.");
3756 static int check_memb_commit_token_sanity(
3760 int endian_conversion_needed)
3764 size_t required_len;
3768 "Received memb_commit_token message is too short... ignoring.");
3774 if (endian_conversion_needed) {
3780 if (msg_len < required_len) {
3782 "Received memb_commit_token message is too short... ignoring.");
3790 static int check_token_hold_cancel_sanity(
3794 int endian_conversion_needed)
3799 "Received token_hold_cancel message is too short... ignoring.");
3815 static int message_handler_orf_token (
3819 int endian_conversion_needed)
3821 char token_storage[1500];
3822 char token_convert[1500];
3825 unsigned int transmits_allowed;
3826 unsigned int mcasted_retransmit;
3827 unsigned int mcasted_regular;
3828 unsigned int last_aru;
3831 unsigned long long tv_current;
3832 unsigned long long tv_diff;
3834 tv_current = qb_util_nano_current_get ();
3835 tv_diff = tv_current -
tv_old;
3839 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3842 if (check_orf_token_sanity(instance, msg, msg_len,
sizeof(token_storage),
3843 endian_conversion_needed) == -1) {
3850 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3851 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3856 if (endian_conversion_needed) {
3857 orf_token_endian_convert ((
struct orf_token *)msg,
3859 msg = (
struct orf_token *)token_convert;
3866 token = (
struct orf_token *)token_storage;
3867 memcpy (token, msg,
sizeof (
struct orf_token));
3876 start_merge_detect_timeout (instance);
3879 cancel_merge_detect_timeout (instance);
3880 cancel_token_hold_retransmit_timeout (instance);
3886 #ifdef TEST_RECOVERY_MSG_COUNT
3928 messages_free (instance, token->
aru);
3947 reset_heartbeat_timeout(instance);
3950 cancel_heartbeat_timeout(instance);
3965 transmits_allowed = fcc_calculate (instance, token);
3966 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3974 fcc_rtr_limit (instance, token, &transmits_allowed);
3975 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3982 fcc_token_update (instance, token, mcasted_retransmit +
3985 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
3990 if (token->
aru == token->
seq) {
3996 if (token->
aru == last_aru && token->
aru_addr != 0) {
4011 "FAILED TO RECEIVE");
4015 memb_set_merge (&instance->
my_id, 1,
4042 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4055 "install seq %x aru %x high seq received %x",
4073 "retrans flag count %x token aru %x install seq %x aru %x %x",
4077 memb_state_operational_enter (instance);
4084 token_send (instance, token, forward_token);
4087 tv_current = qb_util_nano_current_get ();
4088 tv_diff = tv_current -
tv_old;
4092 ((
float)tv_diff) / 1000000.0);
4095 messages_deliver_to_app (instance, 0,
4103 reset_token_timeout (instance);
4104 reset_token_retransmit_timeout (instance);
4108 start_token_hold_retransmit_timeout (instance);
4118 reset_heartbeat_timeout(instance);
4121 cancel_heartbeat_timeout(instance);
4127 static void messages_deliver_to_app (
4130 unsigned int end_point)
4135 struct mcast *mcast_in;
4136 struct mcast mcast_header;
4137 unsigned int range = 0;
4138 int endian_conversion_required;
4139 unsigned int my_high_delivered_stored = 0;
4140 struct srp_addr aligned_system_from;
4155 for (i = 1; i <= range; i++) {
4163 my_high_delivered_stored + i);
4169 my_high_delivered_stored + i, &ptr);
4173 if (res != 0 && skip == 0) {
4184 sort_queue_item_p = ptr;
4186 mcast_in = sort_queue_item_p->
mcast;
4187 assert (mcast_in != (
struct mcast *)0xdeadbeef);
4189 endian_conversion_required = 0;
4191 endian_conversion_required = 1;
4192 mcast_endian_convert (mcast_in, &mcast_header);
4194 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
4197 aligned_system_from = mcast_header.system_from;
4203 memb_set_subset (&aligned_system_from,
4217 "Delivering MCAST message with seq %x to pending delivery queue",
4224 mcast_header.header.nodeid,
4225 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
4227 endian_conversion_required);
4234 static int message_handler_mcast (
4238 int endian_conversion_needed)
4241 struct sq *sort_queue;
4242 struct mcast mcast_header;
4243 struct srp_addr aligned_system_from;
4245 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4249 if (endian_conversion_needed) {
4250 mcast_endian_convert (msg, &mcast_header);
4252 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
4263 #ifdef TEST_DROP_MCAST_PERCENTAGE
4264 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4272 if (memcmp (&instance->
my_ring_id, &mcast_header.ring_id,
4275 aligned_system_from = mcast_header.system_from;
4280 &aligned_system_from, 1,
4286 if (!memb_set_subset (
4287 &aligned_system_from,
4292 memb_set_merge (&aligned_system_from, 1,
4314 mcast_header.ring_id.rep,
4315 (uint64_t)mcast_header.ring_id.seq,
4323 sq_in_range (sort_queue, mcast_header.seq) &&
4324 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4338 mcast_header.seq)) {
4345 update_aru (instance);
4354 static int message_handler_memb_merge_detect (
4358 int endian_conversion_needed)
4361 struct srp_addr aligned_system_from;
4363 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4367 if (endian_conversion_needed) {
4390 memb_set_merge (&aligned_system_from, 1,
4396 if (!memb_set_subset (
4397 &aligned_system_from,
4402 memb_set_merge (&aligned_system_from, 1,
4420 static void memb_join_process (
4426 int gather_entered = 0;
4427 int fail_minus_memb_entries = 0;
4429 struct srp_addr aligned_system_from;
4471 if (memb_set_equal (proc_list,
4476 memb_set_equal (failed_list,
4482 memb_consensus_set (instance, &aligned_system_from);
4485 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4491 memb_state_commit_token_create (instance);
4493 memb_state_commit_enter (instance);
4496 if (memb_consensus_agreed (instance) &&
4497 memb_lowest_in_config (instance)) {
4499 memb_state_commit_token_create (instance);
4501 memb_state_commit_enter (instance);
4506 if (memb_set_subset (proc_list,
4511 memb_set_subset (failed_list,
4518 if (memb_set_subset (&aligned_system_from, 1,
4523 memb_set_merge (proc_list,
4527 if (memb_set_subset (
4528 &instance->
my_id, 1,
4532 &aligned_system_from, 1,
4535 if (memb_set_subset (
4536 &aligned_system_from, 1,
4540 if (memb_set_subset (
4541 &aligned_system_from, 1,
4545 memb_set_merge (failed_list,
4549 memb_set_subtract (fail_minus_memb,
4550 &fail_minus_memb_entries,
4556 memb_set_merge (fail_minus_memb,
4557 fail_minus_memb_entries,
4568 if (gather_entered == 0 &&
4575 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4598 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4601 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4627 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4644 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4669 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4686 static void memb_merge_detect_endian_convert (
4699 static int ignore_join_under_operational (
4706 struct srp_addr aligned_system_from;
4713 if (memb_set_subset (&instance->
my_id, 1,
4722 if ((memb_set_subset (&aligned_system_from, 1,
4731 static int message_handler_memb_join (
4735 int endian_conversion_needed)
4738 struct memb_join *memb_join_convert = alloca (msg_len);
4739 struct srp_addr aligned_system_from;
4741 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4745 if (endian_conversion_needed) {
4747 memb_join_endian_convert (msg, memb_join_convert);
4760 if (pause_flush (instance)) {
4769 if (!ignore_join_under_operational (instance,
memb_join)) {
4770 memb_join_process (instance,
memb_join);
4775 memb_join_process (instance,
memb_join);
4779 if (memb_set_subset (&aligned_system_from,
4786 memb_join_process (instance,
memb_join);
4792 if (memb_set_subset (&aligned_system_from,
4799 memb_join_process (instance,
memb_join);
4800 memb_recovery_state_token_loss (instance);
4808 static int message_handler_memb_commit_token (
4812 int endian_conversion_needed)
4822 "got commit token");
4824 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4828 if (endian_conversion_needed) {
4829 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4831 memcpy (memb_commit_token_convert, msg, msg_len);
4836 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4837 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4847 memb_set_subtract (sub, &sub_entries,
4851 if (memb_set_equal (
addr,
4858 memb_state_commit_enter (instance);
4887 "Sending initial ORF token");
4890 orf_token_send_initial (instance);
4891 reset_token_timeout (instance);
4892 reset_token_retransmit_timeout (instance);
4899 static int message_handler_token_hold_cancel (
4903 int endian_conversion_needed)
4907 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4916 timer_function_token_retransmit_timeout (instance);
4922 static int check_message_header_validity(
4925 unsigned int msg_len,
4930 const char *guessed_str;
4931 const char *msg_byte = msg;
4935 "Message received from %s is too short... Ignoring %u.",
4949 if (message_header->
magic == 0xFFFF) {
4953 guessed_str =
"Corosync 2.2";
4954 }
else if (message_header->
magic == 0xFEFE) {
4958 guessed_str =
"Corosync 2.3+";
4959 }
else if (msg_byte[0] == 0x01) {
4963 guessed_str =
"unencrypted Kronosnet";
4964 }
else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4969 guessed_str =
"unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
4980 guessed_str =
"encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
4984 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
4993 "Message received from %s has unsupported version %u... Ignoring",
5007 unsigned int msg_len,
5013 if (check_message_header_validity(context, msg, msg_len,
system_from) == -1) {
5017 switch (message_header->
type) {
5038 "Message received from %s has wrong type... ignoring %d.\n",
5040 (
int)message_header->
type);
5058 unsigned short ip_port,
5059 unsigned int iface_no)
5079 unsigned int iface_no)
5103 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5137 void (*totem_service_ready) (
void))
5207 timer_function_orf_token_timeout(context);