corosync  3.0.3
totemsrp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2018 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45  * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46  */
47 
48 #include <config.h>
49 
50 #include <assert.h>
51 #ifdef HAVE_ALLOCA_H
52 #include <alloca.h>
53 #endif
54 #include <sys/mman.h>
55 #include <sys/types.h>
56 #include <sys/stat.h>
57 #include <sys/socket.h>
58 #include <netdb.h>
59 #include <sys/un.h>
60 #include <sys/ioctl.h>
61 #include <sys/param.h>
62 #include <netinet/in.h>
63 #include <arpa/inet.h>
64 #include <unistd.h>
65 #include <fcntl.h>
66 #include <stdlib.h>
67 #include <stdio.h>
68 #include <errno.h>
69 #include <sched.h>
70 #include <time.h>
71 #include <sys/time.h>
72 #include <sys/poll.h>
73 #include <sys/uio.h>
74 #include <limits.h>
75 
76 #include <qb/qblist.h>
77 #include <qb/qbdefs.h>
78 #include <qb/qbutil.h>
79 #include <qb/qbloop.h>
80 
81 #include <corosync/swab.h>
82 #include <corosync/sq.h>
83 
84 #define LOGSYS_UTILS_ONLY 1
85 #include <corosync/logsys.h>
86 
87 #include "totemsrp.h"
88 #include "totemnet.h"
89 
90 #include "cs_queue.h"
91 
92 #define LOCALHOST_IP inet_addr("127.0.0.1")
93 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
94 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
95 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
96 #define MAXIOVS 5
97 #define RETRANSMIT_ENTRIES_MAX 30
98 #define TOKEN_SIZE_MAX 64000 /* bytes */
99 #define LEAVE_DUMMY_NODEID 0
100 
101 /*
102  * SRP address.
103  */
104 struct srp_addr {
105  unsigned int nodeid;
106 };
107 
108 /*
109  * Rollover handling:
110  * SEQNO_START_MSG is the starting sequence number after a new configuration
111  * This should remain zero, unless testing overflow in which case
112  * 0x7ffff000 and 0xfffff000 are good starting values.
113  *
114  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
115  * for a token. This should remain zero, unless testing overflow in which
116  * case 07fffff00 or 0xffffff00 are good starting values.
117  */
118 #define SEQNO_START_MSG 0x0
119 #define SEQNO_START_TOKEN 0x0
120 
121 /*
122  * These can be used ot test different rollover points
123  * #define SEQNO_START_MSG 0xfffffe00
124  * #define SEQNO_START_TOKEN 0xfffffe00
125  */
126 
127 /*
128  * These can be used to test the error recovery algorithms
129  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
130  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
131  * #define TEST_DROP_MCAST_PERCENTAGE 50
132  * #define TEST_RECOVERY_MSG_COUNT 300
133  */
134 
135 /*
136  * we compare incoming messages to determine if their endian is
137  * different - if so convert them
138  *
139  * do not change
140  */
141 #define ENDIAN_LOCAL 0xff22
142 
144  MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
145  MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
146  MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
147  MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
148  MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
149  MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
150 };
151 
155 };
156 
157 /*
158  * New membership algorithm local variables
159  */
161  struct srp_addr addr;
162  int set;
163 };
164 
165 
167  struct qb_list_head list;
168  int (*callback_fn) (enum totem_callback_token_type type, const void *);
170  int delete;
171  void *data;
172 };
173 
174 
176  int mcast;
177  int token;
178 };
179 
180 struct mcast {
183  unsigned int seq;
186  unsigned int node_id;
188 } __attribute__((packed));
189 
190 
191 struct rtr_item {
193  unsigned int seq;
194 }__attribute__((packed));
195 
196 
197 struct orf_token {
199  unsigned int seq;
200  unsigned int token_seq;
201  unsigned int aru;
202  unsigned int aru_addr;
204  unsigned int backlog;
205  unsigned int fcc;
208  struct rtr_item rtr_list[0];
209 }__attribute__((packed));
210 
211 
212 struct memb_join {
215  unsigned int proc_list_entries;
216  unsigned int failed_list_entries;
217  unsigned long long ring_seq;
218  unsigned char end_of_memb_join[0];
219 /*
220  * These parts of the data structure are dynamic:
221  * struct srp_addr proc_list[];
222  * struct srp_addr failed_list[];
223  */
224 } __attribute__((packed));
225 
226 
231 } __attribute__((packed));
232 
233 
237 } __attribute__((packed));
238 
239 
242  unsigned int aru;
243  unsigned int high_delivered;
244  unsigned int received_flg;
245 }__attribute__((packed));
246 
247 
250  unsigned int token_seq;
252  unsigned int retrans_flg;
255  unsigned char end_of_commit_token[0];
256 /*
257  * These parts of the data structure are dynamic:
258  *
259  * struct srp_addr addr[PROCESSOR_COUNT_MAX];
260  * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
261  */
262 }__attribute__((packed));
263 
264 struct message_item {
265  struct mcast *mcast;
266  unsigned int msg_len;
267 };
268 
270  struct mcast *mcast;
271  unsigned int msg_len;
272 };
273 
279 };
280 
283 
285 
286  /*
287  * Flow control mcasts and remcasts on last and current orf_token
288  */
290 
292 
294 
296 
298 
300 
301  struct srp_addr my_id;
302 
304 
306 
308 
310 
312 
314 
316 
318 
320 
322 
324 
326 
328 
330 
332 
334 
336 
338 
340 
342 
344 
345  unsigned int my_last_aru;
346 
348 
350 
351  unsigned int my_high_seq_received;
352 
353  unsigned int my_install_seq;
354 
356 
358 
360 
362 
364 
365  /*
366  * Queues used to order, deliver, and recover messages
367  */
369 
371 
373 
375 
377 
378  /*
379  * Received up to and including
380  */
381  unsigned int my_aru;
382 
383  unsigned int my_high_delivered;
384 
385  struct qb_list_head token_callback_received_listhead;
386 
387  struct qb_list_head token_callback_sent_listhead;
388 
390 
392 
393  unsigned int my_token_seq;
394 
395  /*
396  * Timers
397  */
398  qb_loop_timer_handle timer_pause_timeout;
399 
400  qb_loop_timer_handle timer_orf_token_timeout;
401 
402  qb_loop_timer_handle timer_orf_token_warning;
403 
405 
407 
408  qb_loop_timer_handle timer_merge_detect_timeout;
409 
411 
413 
414  qb_loop_timer_handle memb_timer_state_commit_timeout;
415 
416  qb_loop_timer_handle timer_heartbeat_timeout;
417 
418  /*
419  * Function and data used to log messages
420  */
422 
424 
426 
428 
430 
432 
434 
436  int level,
437  int subsys,
438  const char *function,
439  const char *file,
440  int line,
441  const char *format, ...)__attribute__((format(printf, 6, 7)));;
442 
444 
445 //TODO struct srp_addr next_memb;
446 
448 
450 
452  unsigned int nodeid,
453  const void *msg,
454  unsigned int msg_len,
455  int endian_conversion_required);
456 
458  enum totem_configuration_type configuration_type,
459  const unsigned int *member_list, size_t member_list_entries,
460  const unsigned int *left_list, size_t left_list_entries,
461  const unsigned int *joined_list, size_t joined_list_entries,
462  const struct memb_ring_id *ring_id);
463 
465 
467  int waiting_trans_ack);
468 
470  struct memb_ring_id *memb_ring_id,
471  unsigned int nodeid);
472 
474  const struct memb_ring_id *memb_ring_id,
475  unsigned int nodeid);
476 
478 
480 
481  unsigned long long token_ring_id_seq;
482 
483  unsigned int last_released;
484 
485  unsigned int set_aru;
486 
488 
490 
492 
493  unsigned int my_last_seq;
494 
495  struct timeval tv_old;
496 
498 
500 
501  unsigned int use_heartbeat;
502 
503  unsigned int my_trc;
504 
505  unsigned int my_pbl;
506 
507  unsigned int my_cbl;
508 
509  uint64_t pause_timestamp;
510 
512 
514 
516 
518 
520 
522 
523  int flushing;
524 
527  char commit_token_storage[40000];
528 };
529 
531  int count;
532  int (*handler_functions[6]) (
533  struct totemsrp_instance *instance,
534  const void *msg,
535  size_t msg_len,
536  int endian_conversion_needed);
537 };
538 
557 };
558 
559 const char* gather_state_from_desc [] = {
560  [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
561  [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
562  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
563  [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
564  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
565  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
566  [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
567  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
568  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
569  [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
570  [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
571  [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
572  [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
573  [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
574  [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
575  [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
576 };
577 
578 /*
579  * forward decls
580  */
581 static int message_handler_orf_token (
582  struct totemsrp_instance *instance,
583  const void *msg,
584  size_t msg_len,
585  int endian_conversion_needed);
586 
587 static int message_handler_mcast (
588  struct totemsrp_instance *instance,
589  const void *msg,
590  size_t msg_len,
591  int endian_conversion_needed);
592 
593 static int message_handler_memb_merge_detect (
594  struct totemsrp_instance *instance,
595  const void *msg,
596  size_t msg_len,
597  int endian_conversion_needed);
598 
599 static int message_handler_memb_join (
600  struct totemsrp_instance *instance,
601  const void *msg,
602  size_t msg_len,
603  int endian_conversion_needed);
604 
605 static int message_handler_memb_commit_token (
606  struct totemsrp_instance *instance,
607  const void *msg,
608  size_t msg_len,
609  int endian_conversion_needed);
610 
611 static int message_handler_token_hold_cancel (
612  struct totemsrp_instance *instance,
613  const void *msg,
614  size_t msg_len,
615  int endian_conversion_needed);
616 
617 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
618 
619 static void srp_addr_to_nodeid (
620  struct totemsrp_instance *instance,
621  unsigned int *nodeid_out,
622  struct srp_addr *srp_addr_in,
623  unsigned int entries);
624 
625 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
626 
627 static void memb_leave_message_send (struct totemsrp_instance *instance);
628 
629 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
630 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
631 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
632 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
633  int fcc_mcasts_allowed);
634 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
635 
636 static void memb_ring_id_set (struct totemsrp_instance *instance,
637  const struct memb_ring_id *ring_id);
638 static void target_set_completed (void *context);
639 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
640 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
641 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
642 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
643 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
644 static int token_hold_cancel_send (struct totemsrp_instance *instance);
645 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
646 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
647 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
648 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
649 static void memb_merge_detect_endian_convert (
650  const struct memb_merge_detect *in,
651  struct memb_merge_detect *out);
652 static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
653 static void timer_function_orf_token_timeout (void *data);
654 static void timer_function_orf_token_warning (void *data);
655 static void timer_function_pause_timeout (void *data);
656 static void timer_function_heartbeat_timeout (void *data);
657 static void timer_function_token_retransmit_timeout (void *data);
658 static void timer_function_token_hold_retransmit_timeout (void *data);
659 static void timer_function_merge_detect_timeout (void *data);
660 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
661 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
662 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
663 
664 void main_deliver_fn (
665  void *context,
666  const void *msg,
667  unsigned int msg_len,
668  const struct sockaddr_storage *system_from);
669 
671  void *context,
672  const struct totem_ip_address *iface_address,
673  unsigned int iface_no);
674 
676  6,
677  {
678  message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
679  message_handler_mcast, /* MESSAGE_TYPE_MCAST */
680  message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
681  message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
682  message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
683  message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
684  }
685 };
686 
687 #define log_printf(level, format, args...) \
688 do { \
689  instance->totemsrp_log_printf ( \
690  level, instance->totemsrp_subsys_id, \
691  __FUNCTION__, __FILE__, __LINE__, \
692  format, ##args); \
693 } while (0);
694 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
695 do { \
696  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
697  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
698  instance->totemsrp_log_printf ( \
699  level, instance->totemsrp_subsys_id, \
700  __FUNCTION__, __FILE__, __LINE__, \
701  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
702  } while(0)
703 
704 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
705 {
706  if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
707  return gather_state_from_desc[gsfrom];
708  }
709  else {
710  return "UNKNOWN";
711  }
712 }
713 
714 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
715 {
716  memset (instance, 0, sizeof (struct totemsrp_instance));
717 
718  qb_list_init (&instance->token_callback_received_listhead);
719 
720  qb_list_init (&instance->token_callback_sent_listhead);
721 
722  instance->my_received_flg = 1;
723 
724  instance->my_token_seq = SEQNO_START_TOKEN - 1;
725 
727 
728  instance->set_aru = -1;
729 
730  instance->my_aru = SEQNO_START_MSG;
731 
733 
735 
736  instance->orf_token_discard = 0;
737 
738  instance->originated_orf_token = 0;
739 
740  instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
741 
742  instance->waiting_trans_ack = 1;
743 }
744 
745 static int pause_flush (struct totemsrp_instance *instance)
746 {
747  uint64_t now_msec;
748  uint64_t timestamp_msec;
749  int res = 0;
750 
751  now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
752  timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
753 
754  if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
756  "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
757  /*
758  * -1 indicates an error from recvmsg
759  */
760  do {
762  } while (res == -1);
763  }
764  return (res);
765 }
766 
767 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
768 {
769  struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
770  uint32_t time_now;
771  unsigned long long nano_secs = qb_util_nano_current_get ();
772 
773  time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
774 
776  /* incr latest token the index */
777  if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
778  instance->stats.latest_token = 0;
779  else
780  instance->stats.latest_token++;
781 
782  if (instance->stats.earliest_token == instance->stats.latest_token) {
783  /* we have filled up the array, start overwriting */
784  if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
785  instance->stats.earliest_token = 0;
786  else
787  instance->stats.earliest_token++;
788 
789  instance->stats.token[instance->stats.earliest_token].rx = 0;
790  instance->stats.token[instance->stats.earliest_token].tx = 0;
791  instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
792  }
793 
794  instance->stats.token[instance->stats.latest_token].rx = time_now;
795  instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
796  } else {
797  instance->stats.token[instance->stats.latest_token].tx = time_now;
798  }
799  return 0;
800 }
801 
802 static void totempg_mtu_changed(void *context, int net_mtu)
803 {
804  struct totemsrp_instance *instance = context;
805 
806  instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
807 
809  "Net MTU changed to %d, new value is %d",
810  net_mtu, instance->totem_config->net_mtu);
811 }
812 
813 /*
814  * Exported interfaces
815  */
817  qb_loop_t *poll_handle,
818  void **srp_context,
819  struct totem_config *totem_config,
820  totempg_stats_t *stats,
821 
822  void (*deliver_fn) (
823  unsigned int nodeid,
824  const void *msg,
825  unsigned int msg_len,
826  int endian_conversion_required),
827 
828  void (*confchg_fn) (
829  enum totem_configuration_type configuration_type,
830  const unsigned int *member_list, size_t member_list_entries,
831  const unsigned int *left_list, size_t left_list_entries,
832  const unsigned int *joined_list, size_t joined_list_entries,
833  const struct memb_ring_id *ring_id),
834  void (*waiting_trans_ack_cb_fn) (
835  int waiting_trans_ack))
836 {
837  struct totemsrp_instance *instance;
838  int res;
839 
840  instance = malloc (sizeof (struct totemsrp_instance));
841  if (instance == NULL) {
842  goto error_exit;
843  }
844 
845  totemsrp_instance_initialize (instance);
846 
847  instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
848  instance->totemsrp_waiting_trans_ack_cb_fn (1);
849 
850  stats->srp = &instance->stats;
851  instance->stats.latest_token = 0;
852  instance->stats.earliest_token = 0;
853 
854  instance->totem_config = totem_config;
855 
856  /*
857  * Configure logging
858  */
867 
868  /*
869  * Configure totem store and load functions
870  */
873 
874  /*
875  * Initialize local variables for totemsrp
876  */
878 
879  /*
880  * Display totem configuration
881  */
883  "Token Timeout (%d ms) retransmit timeout (%d ms)",
886  uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100;
888  "Token warning every %d ms (%d%% of Token Timeout)",
889  token_warning_ms, totem_config->token_warning);
890  if (token_warning_ms < totem_config->token_retransmit_timeout)
892  "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
893  "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
894  token_warning_ms, totem_config->token_retransmit_timeout);
895  } else {
897  "Token warnings disabled");
898  }
900  "token hold (%d ms) retransmits before loss (%d retrans)",
903  "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
907 
910  "downcheck (%d ms) fail to recv const (%d msgs)",
913  "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
914 
916  "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
918 
920  "missed count const (%d messages)",
922 
924  "send threads (%d threads)", totem_config->threads);
925 
927  "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
929  "max_network_delay (%d ms)", totem_config->max_network_delay);
930 
931 
932  cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
933  sizeof (struct message_item), instance->threaded_mode_enabled);
934 
935  sq_init (&instance->regular_sort_queue,
936  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
937 
938  sq_init (&instance->recovery_sort_queue,
939  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
940 
941  instance->totemsrp_poll_handle = poll_handle;
942 
943  instance->totemsrp_deliver_fn = deliver_fn;
944 
945  instance->totemsrp_confchg_fn = confchg_fn;
946  instance->use_heartbeat = 1;
947 
948  timer_function_pause_timeout (instance);
949 
952  "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
953  instance->use_heartbeat = 0;
954  }
955 
956  if (instance->use_heartbeat) {
957  instance->heartbeat_timeout
960 
961  if (instance->heartbeat_timeout >= totem_config->token_timeout) {
963  "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
964  instance->heartbeat_timeout,
967  "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
969  "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
970  instance->use_heartbeat = 0;
971  }
972  else {
974  "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
975  }
976  }
977 
978  res = totemnet_initialize (
979  poll_handle,
980  &instance->totemnet_context,
981  totem_config,
982  stats->srp,
983  instance,
986  totempg_mtu_changed,
987  target_set_completed);
988  if (res == -1) {
989  goto error_exit;
990  }
991 
992  instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
993 
994  /*
995  * Must have net_mtu adjusted by totemnet_initialize first
996  */
997  cs_queue_init (&instance->new_message_queue,
999  sizeof (struct message_item), instance->threaded_mode_enabled);
1000 
1001  cs_queue_init (&instance->new_message_queue_trans,
1003  sizeof (struct message_item), instance->threaded_mode_enabled);
1004 
1006  &instance->token_recv_event_handle,
1008  0,
1009  token_event_stats_collector,
1010  instance);
1012  &instance->token_sent_event_handle,
1014  0,
1015  token_event_stats_collector,
1016  instance);
1017  *srp_context = instance;
1018  return (0);
1019 
1020 error_exit:
1021  return (-1);
1022 }
1023 
1025  void *srp_context)
1026 {
1027  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1028 
1029  memb_leave_message_send (instance);
1030  totemnet_finalize (instance->totemnet_context);
1031  cs_queue_free (&instance->new_message_queue);
1032  cs_queue_free (&instance->new_message_queue_trans);
1033  cs_queue_free (&instance->retrans_message_queue);
1034  sq_free (&instance->regular_sort_queue);
1035  sq_free (&instance->recovery_sort_queue);
1036  free (instance);
1037 }
1038 
1039 /*
1040  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1041  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1042  * function.
1043  *
1044  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1045  * and if interface was not found, -1 is returned.
1046  */
1048  void *srp_context,
1049  unsigned int nodeid,
1050  unsigned int *interface_id,
1051  struct totem_ip_address *interfaces,
1052  unsigned int interfaces_size,
1053  char ***status,
1054  unsigned int *iface_count)
1055 {
1056  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1057  struct totem_ip_address *iface_ptr = interfaces;
1058  int res = 0;
1059  int i,n;
1060  int num_ifs = 0;
1061 
1062  memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1063  *iface_count = INTERFACE_MAX;
1064 
1065  for (i=0; i<INTERFACE_MAX; i++) {
1066  for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1067  if (instance->totem_config->interfaces[i].configured &&
1068  instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) {
1069  memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1070  interface_id[num_ifs] = i;
1071  iface_ptr++;
1072  if (++num_ifs > interfaces_size) {
1073  res = -2;
1074  break;
1075  }
1076  }
1077  }
1078  }
1079 
1080  totemnet_ifaces_get(instance->totemnet_context, status, iface_count);
1081  *iface_count = num_ifs;
1082  return (res);
1083 }
1084 
1086  void *srp_context,
1087  const char *cipher_type,
1088  const char *hash_type)
1089 {
1090  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1091  int res;
1092 
1093  res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type);
1094 
1095  return (res);
1096 }
1097 
1098 
1100  void *srp_context)
1101 {
1102  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1103  unsigned int res;
1104 
1105  res = instance->my_id.nodeid;
1106 
1107  return (res);
1108 }
1109 
1111  void *srp_context)
1112 {
1113  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1114  int res;
1115 
1116  res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1117 
1118  return (res);
1119 }
1120 
1121 
1122 /*
1123  * Set operations for use by the membership algorithm
1124  */
1125 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1126 {
1127  if (a->nodeid == b->nodeid) {
1128  return 1;
1129  }
1130  return 0;
1131 }
1132 
1133 static void srp_addr_to_nodeid (
1134  struct totemsrp_instance *instance,
1135  unsigned int *nodeid_out,
1136  struct srp_addr *srp_addr_in,
1137  unsigned int entries)
1138 {
1139  unsigned int i;
1140 
1141  for (i = 0; i < entries; i++) {
1142  nodeid_out[i] = srp_addr_in[i].nodeid;
1143  }
1144 }
1145 
1146 static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1147 {
1148  struct srp_addr res;
1149 
1150  res.nodeid = swab32 (in.nodeid);
1151 
1152  return (res);
1153 }
1154 
1155 static void memb_consensus_reset (struct totemsrp_instance *instance)
1156 {
1157  instance->consensus_list_entries = 0;
1158 }
1159 
1160 static void memb_set_subtract (
1161  struct srp_addr *out_list, int *out_list_entries,
1162  struct srp_addr *one_list, int one_list_entries,
1163  struct srp_addr *two_list, int two_list_entries)
1164 {
1165  int found = 0;
1166  int i;
1167  int j;
1168 
1169  *out_list_entries = 0;
1170 
1171  for (i = 0; i < one_list_entries; i++) {
1172  for (j = 0; j < two_list_entries; j++) {
1173  if (srp_addr_equal (&one_list[i], &two_list[j])) {
1174  found = 1;
1175  break;
1176  }
1177  }
1178  if (found == 0) {
1179  out_list[*out_list_entries] = one_list[i];
1180  *out_list_entries = *out_list_entries + 1;
1181  }
1182  found = 0;
1183  }
1184 }
1185 
1186 /*
1187  * Set consensus for a specific processor
1188  */
1189 static void memb_consensus_set (
1190  struct totemsrp_instance *instance,
1191  const struct srp_addr *addr)
1192 {
1193  int found = 0;
1194  int i;
1195 
1196  for (i = 0; i < instance->consensus_list_entries; i++) {
1197  if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1198  found = 1;
1199  break; /* found entry */
1200  }
1201  }
1202  instance->consensus_list[i].addr = *addr;
1203  instance->consensus_list[i].set = 1;
1204  if (found == 0) {
1205  instance->consensus_list_entries++;
1206  }
1207  return;
1208 }
1209 
1210 /*
1211  * Is consensus set for a specific processor
1212  */
1213 static int memb_consensus_isset (
1214  struct totemsrp_instance *instance,
1215  const struct srp_addr *addr)
1216 {
1217  int i;
1218 
1219  for (i = 0; i < instance->consensus_list_entries; i++) {
1220  if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1221  return (instance->consensus_list[i].set);
1222  }
1223  }
1224  return (0);
1225 }
1226 
1227 /*
1228  * Is consensus agreed upon based upon consensus database
1229  */
1230 static int memb_consensus_agreed (
1231  struct totemsrp_instance *instance)
1232 {
1233  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1234  int token_memb_entries = 0;
1235  int agreed = 1;
1236  int i;
1237 
1238  memb_set_subtract (token_memb, &token_memb_entries,
1239  instance->my_proc_list, instance->my_proc_list_entries,
1240  instance->my_failed_list, instance->my_failed_list_entries);
1241 
1242  for (i = 0; i < token_memb_entries; i++) {
1243  if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1244  agreed = 0;
1245  break;
1246  }
1247  }
1248 
1249  if (agreed && instance->failed_to_recv == 1) {
1250  /*
1251  * Both nodes agreed on our failure. We don't care how many proc list items left because we
1252  * will create single ring anyway.
1253  */
1254 
1255  return (agreed);
1256  }
1257 
1258  assert (token_memb_entries >= 1);
1259 
1260  return (agreed);
1261 }
1262 
1263 static void memb_consensus_notset (
1264  struct totemsrp_instance *instance,
1265  struct srp_addr *no_consensus_list,
1266  int *no_consensus_list_entries,
1267  struct srp_addr *comparison_list,
1268  int comparison_list_entries)
1269 {
1270  int i;
1271 
1272  *no_consensus_list_entries = 0;
1273 
1274  for (i = 0; i < instance->my_proc_list_entries; i++) {
1275  if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1276  no_consensus_list[*no_consensus_list_entries] = instance->my_proc_list[i];
1277  *no_consensus_list_entries = *no_consensus_list_entries + 1;
1278  }
1279  }
1280 }
1281 
1282 /*
1283  * Is set1 equal to set2 Entries can be in different orders
1284  */
1285 static int memb_set_equal (
1286  struct srp_addr *set1, int set1_entries,
1287  struct srp_addr *set2, int set2_entries)
1288 {
1289  int i;
1290  int j;
1291 
1292  int found = 0;
1293 
1294  if (set1_entries != set2_entries) {
1295  return (0);
1296  }
1297  for (i = 0; i < set2_entries; i++) {
1298  for (j = 0; j < set1_entries; j++) {
1299  if (srp_addr_equal (&set1[j], &set2[i])) {
1300  found = 1;
1301  break;
1302  }
1303  }
1304  if (found == 0) {
1305  return (0);
1306  }
1307  found = 0;
1308  }
1309  return (1);
1310 }
1311 
1312 /*
1313  * Is subset fully contained in fullset
1314  */
1315 static int memb_set_subset (
1316  const struct srp_addr *subset, int subset_entries,
1317  const struct srp_addr *fullset, int fullset_entries)
1318 {
1319  int i;
1320  int j;
1321  int found = 0;
1322 
1323  if (subset_entries > fullset_entries) {
1324  return (0);
1325  }
1326  for (i = 0; i < subset_entries; i++) {
1327  for (j = 0; j < fullset_entries; j++) {
1328  if (srp_addr_equal (&subset[i], &fullset[j])) {
1329  found = 1;
1330  }
1331  }
1332  if (found == 0) {
1333  return (0);
1334  }
1335  found = 0;
1336  }
1337  return (1);
1338 }
1339 /*
1340  * merge subset into fullset taking care not to add duplicates
1341  */
1342 static void memb_set_merge (
1343  const struct srp_addr *subset, int subset_entries,
1344  struct srp_addr *fullset, int *fullset_entries)
1345 {
1346  int found = 0;
1347  int i;
1348  int j;
1349 
1350  for (i = 0; i < subset_entries; i++) {
1351  for (j = 0; j < *fullset_entries; j++) {
1352  if (srp_addr_equal (&fullset[j], &subset[i])) {
1353  found = 1;
1354  break;
1355  }
1356  }
1357  if (found == 0) {
1358  fullset[*fullset_entries] = subset[i];
1359  *fullset_entries = *fullset_entries + 1;
1360  }
1361  found = 0;
1362  }
1363  return;
1364 }
1365 
1366 static void memb_set_and_with_ring_id (
1367  struct srp_addr *set1,
1368  struct memb_ring_id *set1_ring_ids,
1369  int set1_entries,
1370  struct srp_addr *set2,
1371  int set2_entries,
1372  struct memb_ring_id *old_ring_id,
1373  struct srp_addr *and,
1374  int *and_entries)
1375 {
1376  int i;
1377  int j;
1378  int found = 0;
1379 
1380  *and_entries = 0;
1381 
1382  for (i = 0; i < set2_entries; i++) {
1383  for (j = 0; j < set1_entries; j++) {
1384  if (srp_addr_equal (&set1[j], &set2[i])) {
1385  if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1386  found = 1;
1387  }
1388  break;
1389  }
1390  }
1391  if (found) {
1392  and[*and_entries] = set1[j];
1393  *and_entries = *and_entries + 1;
1394  }
1395  found = 0;
1396  }
1397  return;
1398 }
1399 
1400 static void memb_set_log(
1401  struct totemsrp_instance *instance,
1402  int level,
1403  const char *string,
1404  struct srp_addr *list,
1405  int list_entries)
1406 {
1407  char int_buf[32];
1408  char list_str[512];
1409  int i;
1410 
1411  memset(list_str, 0, sizeof(list_str));
1412 
1413  for (i = 0; i < list_entries; i++) {
1414  if (i == 0) {
1415  snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1416  } else {
1417  snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1418  }
1419 
1420  if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1421  break ;
1422  }
1423  strcat(list_str, int_buf);
1424  }
1425 
1426  log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1427 }
1428 
1429 static void my_leave_memb_clear(
1430  struct totemsrp_instance *instance)
1431 {
1432  memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1433  instance->my_leave_memb_entries = 0;
1434 }
1435 
1436 static unsigned int my_leave_memb_match(
1437  struct totemsrp_instance *instance,
1438  unsigned int nodeid)
1439 {
1440  int i;
1441  unsigned int ret = 0;
1442 
1443  for (i = 0; i < instance->my_leave_memb_entries; i++){
1444  if (instance->my_leave_memb_list[i] == nodeid){
1445  ret = nodeid;
1446  break;
1447  }
1448  }
1449  return ret;
1450 }
1451 
1452 static void my_leave_memb_set(
1453  struct totemsrp_instance *instance,
1454  unsigned int nodeid)
1455 {
1456  int i, found = 0;
1457  for (i = 0; i < instance->my_leave_memb_entries; i++){
1458  if (instance->my_leave_memb_list[i] == nodeid){
1459  found = 1;
1460  break;
1461  }
1462  }
1463  if (found == 1) {
1464  return;
1465  }
1466  if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1467  instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1468  instance->my_leave_memb_entries++;
1469  } else {
1471  "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1472  }
1473 }
1474 
1475 
1476 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1477 {
1478  assert (instance != NULL);
1479  return totemnet_buffer_alloc (instance->totemnet_context);
1480 }
1481 
1482 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1483 {
1484  assert (instance != NULL);
1485  totemnet_buffer_release (instance->totemnet_context, ptr);
1486 }
1487 
1488 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1489 {
1490  int32_t res;
1491 
1492  qb_loop_timer_del (instance->totemsrp_poll_handle,
1494  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1495  QB_LOOP_MED,
1496  instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1497  (void *)instance,
1498  timer_function_token_retransmit_timeout,
1499  &instance->timer_orf_token_retransmit_timeout);
1500  if (res != 0) {
1501  log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1502  }
1503 
1504 }
1505 
1506 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1507 {
1508  int32_t res;
1509 
1510  if (instance->my_merge_detect_timeout_outstanding == 0) {
1511  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1512  QB_LOOP_MED,
1513  instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1514  (void *)instance,
1515  timer_function_merge_detect_timeout,
1516  &instance->timer_merge_detect_timeout);
1517  if (res != 0) {
1518  log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1519  }
1520 
1522  }
1523 }
1524 
1525 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1526 {
1527  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1529 }
1530 
1531 /*
1532  * ring_state_* is used to save and restore the sort queue
1533  * state when a recovery operation fails (and enters gather)
1534  */
1535 static void old_ring_state_save (struct totemsrp_instance *instance)
1536 {
1537  if (instance->old_ring_state_saved == 0) {
1538  instance->old_ring_state_saved = 1;
1539  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1540  sizeof (struct memb_ring_id));
1541  instance->old_ring_state_aru = instance->my_aru;
1544  "Saving state aru %x high seq received %x",
1545  instance->my_aru, instance->my_high_seq_received);
1546  }
1547 }
1548 
1549 static void old_ring_state_restore (struct totemsrp_instance *instance)
1550 {
1551  instance->my_aru = instance->old_ring_state_aru;
1554  "Restoring instance->my_aru %x my high seq received %x",
1555  instance->my_aru, instance->my_high_seq_received);
1556 }
1557 
1558 static void old_ring_state_reset (struct totemsrp_instance *instance)
1559 {
1561  "Resetting old ring state");
1562  instance->old_ring_state_saved = 0;
1563 }
1564 
1565 static void reset_pause_timeout (struct totemsrp_instance *instance)
1566 {
1567  int32_t res;
1568 
1569  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1570  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1571  QB_LOOP_MED,
1572  instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1573  (void *)instance,
1574  timer_function_pause_timeout,
1575  &instance->timer_pause_timeout);
1576  if (res != 0) {
1577  log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1578  }
1579 }
1580 
1581 static void reset_token_warning (struct totemsrp_instance *instance) {
1582  int32_t res;
1583 
1584  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1585  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1586  QB_LOOP_MED,
1587  instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC,
1588  (void *)instance,
1589  timer_function_orf_token_warning,
1590  &instance->timer_orf_token_warning);
1591  if (res != 0) {
1592  log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1593  }
1594 }
1595 
1596 static void reset_token_timeout (struct totemsrp_instance *instance) {
1597  int32_t res;
1598 
1599  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1600  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1601  QB_LOOP_MED,
1602  instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1603  (void *)instance,
1604  timer_function_orf_token_timeout,
1605  &instance->timer_orf_token_timeout);
1606  if (res != 0) {
1607  log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1608  }
1609 
1610  if (instance->totem_config->token_warning)
1611  reset_token_warning(instance);
1612 }
1613 
1614 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1615  int32_t res;
1616 
1617  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1618  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1619  QB_LOOP_MED,
1620  instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1621  (void *)instance,
1622  timer_function_heartbeat_timeout,
1623  &instance->timer_heartbeat_timeout);
1624  if (res != 0) {
1625  log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1626  }
1627 }
1628 
1629 
1630 static void cancel_token_warning (struct totemsrp_instance *instance) {
1631  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1632 }
1633 
1634 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1635  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1636 
1637  if (instance->totem_config->token_warning)
1638  cancel_token_warning(instance);
1639 }
1640 
1641 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1642  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1643 }
1644 
1645 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1646 {
1647  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1648 }
1649 
1650 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1651 {
1652  int32_t res;
1653 
1654  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1655  QB_LOOP_MED,
1656  instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1657  (void *)instance,
1658  timer_function_token_hold_retransmit_timeout,
1659  &instance->timer_orf_token_hold_retransmit_timeout);
1660  if (res != 0) {
1661  log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1662  }
1663 }
1664 
1665 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1666 {
1667  qb_loop_timer_del (instance->totemsrp_poll_handle,
1669 }
1670 
1671 static void memb_state_consensus_timeout_expired (
1672  struct totemsrp_instance *instance)
1673 {
1674  struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1675  int no_consensus_list_entries;
1676 
1677  instance->stats.consensus_timeouts++;
1678  if (memb_consensus_agreed (instance)) {
1679  memb_consensus_reset (instance);
1680 
1681  memb_consensus_set (instance, &instance->my_id);
1682 
1683  reset_token_timeout (instance); // REVIEWED
1684  } else {
1685  memb_consensus_notset (
1686  instance,
1687  no_consensus_list,
1688  &no_consensus_list_entries,
1689  instance->my_proc_list,
1690  instance->my_proc_list_entries);
1691 
1692  memb_set_merge (no_consensus_list, no_consensus_list_entries,
1693  instance->my_failed_list, &instance->my_failed_list_entries);
1694  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1695  }
1696 }
1697 
1698 static void memb_join_message_send (struct totemsrp_instance *instance);
1699 
1700 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1701 
1702 /*
1703  * Timers used for various states of the membership algorithm
1704  */
1705 static void timer_function_pause_timeout (void *data)
1706 {
1707  struct totemsrp_instance *instance = data;
1708 
1709  instance->pause_timestamp = qb_util_nano_current_get ();
1710  reset_pause_timeout (instance);
1711 }
1712 
1713 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1714 {
1715  old_ring_state_restore (instance);
1716  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1717  instance->stats.recovery_token_lost++;
1718 }
1719 
1720 static void timer_function_orf_token_warning (void *data)
1721 {
1722  struct totemsrp_instance *instance = data;
1723  uint64_t tv_diff;
1724 
1725  /* need to protect against the case where token_warning is set to 0 dynamically */
1726  if (instance->totem_config->token_warning) {
1727  tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1728  instance->stats.token[instance->stats.latest_token].rx;
1730  "Token has not been received in %d ms ", (unsigned int) tv_diff);
1731  reset_token_warning(instance);
1732  } else {
1733  cancel_token_warning(instance);
1734  }
1735 }
1736 
1737 static void timer_function_orf_token_timeout (void *data)
1738 {
1739  struct totemsrp_instance *instance = data;
1740 
1741  switch (instance->memb_state) {
1744  "The token was lost in the OPERATIONAL state.");
1746  "A processor failed, forming new configuration.");
1748  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1749  instance->stats.operational_token_lost++;
1750  break;
1751 
1752  case MEMB_STATE_GATHER:
1754  "The consensus timeout expired.");
1755  memb_state_consensus_timeout_expired (instance);
1756  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1757  instance->stats.gather_token_lost++;
1758  break;
1759 
1760  case MEMB_STATE_COMMIT:
1762  "The token was lost in the COMMIT state.");
1763  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1764  instance->stats.commit_token_lost++;
1765  break;
1766 
1767  case MEMB_STATE_RECOVERY:
1769  "The token was lost in the RECOVERY state.");
1770  memb_recovery_state_token_loss (instance);
1771  instance->orf_token_discard = 1;
1772  break;
1773  }
1774 }
1775 
1776 static void timer_function_heartbeat_timeout (void *data)
1777 {
1778  struct totemsrp_instance *instance = data;
1780  "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1781  timer_function_orf_token_timeout(data);
1782 }
1783 
1784 static void memb_timer_function_state_gather (void *data)
1785 {
1786  struct totemsrp_instance *instance = data;
1787  int32_t res;
1788 
1789  switch (instance->memb_state) {
1791  case MEMB_STATE_RECOVERY:
1792  assert (0); /* this should never happen */
1793  break;
1794  case MEMB_STATE_GATHER:
1795  case MEMB_STATE_COMMIT:
1796  memb_join_message_send (instance);
1797 
1798  /*
1799  * Restart the join timeout
1800  `*/
1801  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1802 
1803  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1804  QB_LOOP_MED,
1805  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1806  (void *)instance,
1807  memb_timer_function_state_gather,
1808  &instance->memb_timer_state_gather_join_timeout);
1809 
1810  if (res != 0) {
1811  log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1812  }
1813  break;
1814  }
1815 }
1816 
1817 static void memb_timer_function_gather_consensus_timeout (void *data)
1818 {
1819  struct totemsrp_instance *instance = data;
1820  memb_state_consensus_timeout_expired (instance);
1821 }
1822 
1823 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1824 {
1825  unsigned int i;
1826  struct sort_queue_item *recovery_message_item;
1827  struct sort_queue_item regular_message_item;
1828  unsigned int range = 0;
1829  int res;
1830  void *ptr;
1831  struct mcast *mcast;
1832 
1834  "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1835 
1836  range = instance->my_aru - SEQNO_START_MSG;
1837  /*
1838  * Move messages from recovery to regular sort queue
1839  */
1840 // todo should i be initialized to 0 or 1 ?
1841  for (i = 1; i <= range; i++) {
1842  res = sq_item_get (&instance->recovery_sort_queue,
1843  i + SEQNO_START_MSG, &ptr);
1844  if (res != 0) {
1845  continue;
1846  }
1847  recovery_message_item = ptr;
1848 
1849  /*
1850  * Convert recovery message into regular message
1851  */
1852  mcast = recovery_message_item->mcast;
1854  /*
1855  * Message is a recovery message encapsulated
1856  * in a new ring message
1857  */
1858  regular_message_item.mcast =
1859  (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1860  regular_message_item.msg_len =
1861  recovery_message_item->msg_len - sizeof (struct mcast);
1862  mcast = regular_message_item.mcast;
1863  } else {
1864  /*
1865  * TODO this case shouldn't happen
1866  */
1867  continue;
1868  }
1869 
1871  "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1872  (uint64_t)mcast->seq);
1873 
1874  /*
1875  * Only add this message to the regular sort
1876  * queue if it was originated with the same ring
1877  * id as the previous ring
1878  */
1879  if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1880  sizeof (struct memb_ring_id)) == 0) {
1881 
1882  res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1883  if (res == 0) {
1884  sq_item_add (&instance->regular_sort_queue,
1885  &regular_message_item, mcast->seq);
1886  if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1888  }
1889  }
1890  } else {
1892  "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1893  }
1894  }
1895 }
1896 
1897 /*
1898  * Change states in the state machine of the membership algorithm
1899  */
1900 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1901 {
1902  struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1903  int joined_list_entries = 0;
1904  unsigned int aru_save;
1905  unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1906  unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1907  unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1908  unsigned int left_list[PROCESSOR_COUNT_MAX];
1909  unsigned int i;
1910  unsigned int res;
1911  char left_node_msg[1024];
1912  char joined_node_msg[1024];
1913  char failed_node_msg[1024];
1914 
1915  instance->originated_orf_token = 0;
1916 
1917  memb_consensus_reset (instance);
1918 
1919  old_ring_state_reset (instance);
1920 
1921  deliver_messages_from_recovery_to_regular (instance);
1922 
1924  "Delivering to app %x to %x",
1925  instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1926 
1927  aru_save = instance->my_aru;
1928  instance->my_aru = instance->old_ring_state_aru;
1929 
1930  messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1931 
1932  /*
1933  * Calculate joined and left list
1934  */
1935  memb_set_subtract (instance->my_left_memb_list,
1936  &instance->my_left_memb_entries,
1937  instance->my_memb_list, instance->my_memb_entries,
1938  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1939 
1940  memb_set_subtract (joined_list, &joined_list_entries,
1941  instance->my_new_memb_list, instance->my_new_memb_entries,
1942  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1943 
1944  /*
1945  * Install new membership
1946  */
1947  instance->my_memb_entries = instance->my_new_memb_entries;
1948  memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1949  sizeof (struct srp_addr) * instance->my_memb_entries);
1950  instance->last_released = 0;
1951  instance->my_set_retrans_flg = 0;
1952 
1953  /*
1954  * Deliver transitional configuration to application
1955  */
1956  srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1957  instance->my_left_memb_entries);
1958  srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1959  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1961  trans_memb_list_totemip, instance->my_trans_memb_entries,
1962  left_list, instance->my_left_memb_entries,
1963  0, 0, &instance->my_ring_id);
1964  instance->waiting_trans_ack = 1;
1965  instance->totemsrp_waiting_trans_ack_cb_fn (1);
1966 
1967 // TODO we need to filter to ensure we only deliver those
1968 // messages which are part of instance->my_deliver_memb
1969  messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1970 
1971  instance->my_aru = aru_save;
1972 
1973  /*
1974  * Deliver regular configuration to application
1975  */
1976  srp_addr_to_nodeid (instance, new_memb_list_totemip,
1977  instance->my_new_memb_list, instance->my_new_memb_entries);
1978  srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
1979  joined_list_entries);
1981  new_memb_list_totemip, instance->my_new_memb_entries,
1982  0, 0,
1983  joined_list_totemip, joined_list_entries, &instance->my_ring_id);
1984 
1985  /*
1986  * The recovery sort queue now becomes the regular
1987  * sort queue. It is necessary to copy the state
1988  * into the regular sort queue.
1989  */
1990  sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
1991  instance->my_last_aru = SEQNO_START_MSG;
1992 
1993  /* When making my_proc_list smaller, ensure that the
1994  * now non-used entries are zero-ed out. There are some suspect
1995  * assert's that assume that there is always 2 entries in the list.
1996  * These fail when my_proc_list is reduced to 1 entry (and the
1997  * valid [0] entry is the same as the 'unused' [1] entry).
1998  */
1999  memset(instance->my_proc_list, 0,
2000  sizeof (struct srp_addr) * instance->my_proc_list_entries);
2001 
2002  instance->my_proc_list_entries = instance->my_new_memb_entries;
2003  memcpy (instance->my_proc_list, instance->my_new_memb_list,
2004  sizeof (struct srp_addr) * instance->my_memb_entries);
2005 
2006  instance->my_failed_list_entries = 0;
2007  /*
2008  * TODO Not exactly to spec
2009  *
2010  * At the entry to this function all messages without a gap are
2011  * deliered.
2012  *
2013  * This code throw away messages from the last gap in the sort queue
2014  * to my_high_seq_received
2015  *
2016  * What should really happen is we should deliver all messages up to
2017  * a gap, then delier the transitional configuration, then deliver
2018  * the messages between the first gap and my_high_seq_received, then
2019  * deliver a regular configuration, then deliver the regular
2020  * configuration
2021  *
2022  * Unfortunately totempg doesn't appear to like this operating mode
2023  * which needs more inspection
2024  */
2025  i = instance->my_high_seq_received + 1;
2026  do {
2027  void *ptr;
2028 
2029  i -= 1;
2030  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2031  if (i == 0) {
2032  break;
2033  }
2034  } while (res);
2035 
2036  instance->my_high_delivered = i;
2037 
2038  for (i = 0; i <= instance->my_high_delivered; i++) {
2039  void *ptr;
2040 
2041  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2042  if (res == 0) {
2043  struct sort_queue_item *regular_message;
2044 
2045  regular_message = ptr;
2046  free (regular_message->mcast);
2047  }
2048  }
2049  sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2050  instance->last_released = instance->my_high_delivered;
2051 
2052  if (joined_list_entries) {
2053  int sptr = 0;
2054  sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2055  for (i=0; i< joined_list_entries; i++) {
2056  sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " " CS_PRI_NODE_ID, joined_list_totemip[i]);
2057  }
2058  }
2059  else {
2060  joined_node_msg[0] = '\0';
2061  }
2062 
2063  if (instance->my_left_memb_entries) {
2064  int sptr = 0;
2065  int sptr2 = 0;
2066  sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2067  for (i=0; i< instance->my_left_memb_entries; i++) {
2068  sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " " CS_PRI_NODE_ID, left_list[i]);
2069  }
2070  for (i=0; i< instance->my_left_memb_entries; i++) {
2071  if (my_leave_memb_match(instance, left_list[i]) == 0) {
2072  if (sptr2 == 0) {
2073  sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2074  }
2075  sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " " CS_PRI_NODE_ID, left_list[i]);
2076  }
2077  }
2078  if (sptr2 == 0) {
2079  failed_node_msg[0] = '\0';
2080  }
2081  }
2082  else {
2083  left_node_msg[0] = '\0';
2084  failed_node_msg[0] = '\0';
2085  }
2086 
2087  my_leave_memb_clear(instance);
2088 
2090  "entering OPERATIONAL state.");
2092  "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2093  instance->my_ring_id.rep,
2094  (uint64_t)instance->my_ring_id.seq,
2095  joined_node_msg,
2096  left_node_msg);
2097 
2098  if (strlen(failed_node_msg)) {
2100  "Failed to receive the leave message.%s",
2101  failed_node_msg);
2102  }
2103 
2104  instance->memb_state = MEMB_STATE_OPERATIONAL;
2105 
2106  instance->stats.operational_entered++;
2107  instance->stats.continuous_gather = 0;
2108 
2109  instance->my_received_flg = 1;
2110 
2111  reset_pause_timeout (instance);
2112 
2113  /*
2114  * Save ring id information from this configuration to determine
2115  * which processors are transitioning from old regular configuration
2116  * in to new regular configuration on the next configuration change
2117  */
2118  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2119  sizeof (struct memb_ring_id));
2120 
2121  return;
2122 }
2123 
2124 static void memb_state_gather_enter (
2125  struct totemsrp_instance *instance,
2126  enum gather_state_from gather_from)
2127 {
2128  int32_t res;
2129 
2130  instance->orf_token_discard = 1;
2131 
2132  instance->originated_orf_token = 0;
2133 
2134  memb_set_merge (
2135  &instance->my_id, 1,
2136  instance->my_proc_list, &instance->my_proc_list_entries);
2137 
2138  memb_join_message_send (instance);
2139 
2140  /*
2141  * Restart the join timeout
2142  */
2143  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2144 
2145  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2146  QB_LOOP_MED,
2147  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2148  (void *)instance,
2149  memb_timer_function_state_gather,
2150  &instance->memb_timer_state_gather_join_timeout);
2151  if (res != 0) {
2152  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2153  }
2154 
2155  /*
2156  * Restart the consensus timeout
2157  */
2158  qb_loop_timer_del (instance->totemsrp_poll_handle,
2160 
2161  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2162  QB_LOOP_MED,
2163  instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2164  (void *)instance,
2165  memb_timer_function_gather_consensus_timeout,
2166  &instance->memb_timer_state_gather_consensus_timeout);
2167  if (res != 0) {
2168  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2169  }
2170 
2171  /*
2172  * Cancel the token loss and token retransmission timeouts
2173  */
2174  cancel_token_retransmit_timeout (instance); // REVIEWED
2175  cancel_token_timeout (instance); // REVIEWED
2176  cancel_merge_detect_timeout (instance);
2177 
2178  memb_consensus_reset (instance);
2179 
2180  memb_consensus_set (instance, &instance->my_id);
2181 
2183  "entering GATHER state from %d(%s).",
2184  gather_from, gsfrom_to_msg(gather_from));
2185 
2186  instance->memb_state = MEMB_STATE_GATHER;
2187  instance->stats.gather_entered++;
2188 
2189  if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2190  /*
2191  * State 3 means gather, so we are continuously gathering.
2192  */
2193  instance->stats.continuous_gather++;
2194  }
2195 
2196  return;
2197 }
2198 
2199 static void timer_function_token_retransmit_timeout (void *data);
2200 
2201 static void target_set_completed (
2202  void *context)
2203 {
2204  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2205 
2206  memb_state_commit_token_send (instance);
2207 
2208 }
2209 
2210 static void memb_state_commit_enter (
2211  struct totemsrp_instance *instance)
2212 {
2213  old_ring_state_save (instance);
2214 
2215  memb_state_commit_token_update (instance);
2216 
2217  memb_state_commit_token_target_set (instance);
2218 
2219  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2220 
2222 
2223  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2224 
2226 
2227  memb_ring_id_set (instance, &instance->commit_token->ring_id);
2228 
2229  instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2230 
2231  instance->token_ring_id_seq = instance->my_ring_id.seq;
2232 
2234  "entering COMMIT state.");
2235 
2236  instance->memb_state = MEMB_STATE_COMMIT;
2237  reset_token_retransmit_timeout (instance); // REVIEWED
2238  reset_token_timeout (instance); // REVIEWED
2239 
2240  instance->stats.commit_entered++;
2241  instance->stats.continuous_gather = 0;
2242 
2243  /*
2244  * reset all flow control variables since we are starting a new ring
2245  */
2246  instance->my_trc = 0;
2247  instance->my_pbl = 0;
2248  instance->my_cbl = 0;
2249  /*
2250  * commit token sent after callback that token target has been set
2251  */
2252 }
2253 
2254 static void memb_state_recovery_enter (
2255  struct totemsrp_instance *instance,
2257 {
2258  int i;
2259  int local_received_flg = 1;
2260  unsigned int low_ring_aru;
2261  unsigned int range = 0;
2262  unsigned int messages_originated = 0;
2263  const struct srp_addr *addr;
2264  struct memb_commit_token_memb_entry *memb_list;
2265  struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2266 
2267  addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2268  memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2269 
2271  "entering RECOVERY state.");
2272 
2273  instance->orf_token_discard = 0;
2274 
2275  instance->my_high_ring_delivered = 0;
2276 
2277  sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2278  cs_queue_reinit (&instance->retrans_message_queue);
2279 
2280  low_ring_aru = instance->old_ring_state_high_seq_received;
2281 
2282  memb_state_commit_token_send_recovery (instance, commit_token);
2283 
2284  instance->my_token_seq = SEQNO_START_TOKEN - 1;
2285 
2286  /*
2287  * Build regular configuration
2288  */
2290  instance->totemnet_context,
2291  commit_token->addr_entries);
2292 
2293  /*
2294  * Build transitional configuration
2295  */
2296  for (i = 0; i < instance->my_new_memb_entries; i++) {
2297  memcpy (&my_new_memb_ring_id_list[i],
2298  &memb_list[i].ring_id,
2299  sizeof (struct memb_ring_id));
2300  }
2301  memb_set_and_with_ring_id (
2302  instance->my_new_memb_list,
2303  my_new_memb_ring_id_list,
2304  instance->my_new_memb_entries,
2305  instance->my_memb_list,
2306  instance->my_memb_entries,
2307  &instance->my_old_ring_id,
2308  instance->my_trans_memb_list,
2309  &instance->my_trans_memb_entries);
2310 
2311  for (i = 0; i < instance->my_trans_memb_entries; i++) {
2313  "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2314  }
2315  for (i = 0; i < instance->my_new_memb_entries; i++) {
2317  "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2319  "previous ringid (" CS_PRI_RING_ID ")",
2320  memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2321 
2323  "aru %x high delivered %x received flag %d",
2324  memb_list[i].aru,
2325  memb_list[i].high_delivered,
2326  memb_list[i].received_flg);
2327 
2328  // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2329  }
2330  /*
2331  * Determine if any received flag is false
2332  */
2333  for (i = 0; i < commit_token->addr_entries; i++) {
2334  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2335  instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2336 
2337  memb_list[i].received_flg == 0) {
2338  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2339  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2340  sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2341  local_received_flg = 0;
2342  break;
2343  }
2344  }
2345  if (local_received_flg == 1) {
2346  goto no_originate;
2347  } /* Else originate messages if we should */
2348 
2349  /*
2350  * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2351  */
2352  for (i = 0; i < commit_token->addr_entries; i++) {
2353  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2354  instance->my_deliver_memb_list,
2355  instance->my_deliver_memb_entries) &&
2356 
2357  memcmp (&instance->my_old_ring_id,
2358  &memb_list[i].ring_id,
2359  sizeof (struct memb_ring_id)) == 0) {
2360 
2361  if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2362 
2363  low_ring_aru = memb_list[i].aru;
2364  }
2365  if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2366  instance->my_high_ring_delivered = memb_list[i].high_delivered;
2367  }
2368  }
2369  }
2370 
2371  /*
2372  * Copy all old ring messages to instance->retrans_message_queue
2373  */
2374  range = instance->old_ring_state_high_seq_received - low_ring_aru;
2375  if (range == 0) {
2376  /*
2377  * No messages to copy
2378  */
2379  goto no_originate;
2380  }
2381  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2382 
2384  "copying all old ring messages from %x-%x.",
2385  low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2386 
2387  for (i = 1; i <= range; i++) {
2389  struct message_item message_item;
2390  void *ptr;
2391  int res;
2392 
2393  res = sq_item_get (&instance->regular_sort_queue,
2394  low_ring_aru + i, &ptr);
2395  if (res != 0) {
2396  continue;
2397  }
2398  sort_queue_item = ptr;
2399  messages_originated++;
2400  memset (&message_item, 0, sizeof (struct message_item));
2401  // TODO LEAK
2402  message_item.mcast = totemsrp_buffer_alloc (instance);
2403  assert (message_item.mcast);
2404  memset(message_item.mcast, 0, sizeof (struct mcast));
2408  message_item.mcast->system_from = instance->my_id;
2410 
2411  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2412  assert (message_item.mcast->header.nodeid);
2413  memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2414  sizeof (struct memb_ring_id));
2415  message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2416  memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2419  cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2420  }
2422  "Originated %d messages in RECOVERY.", messages_originated);
2423  goto originated;
2424 
2425 no_originate:
2427  "Did not need to originate any messages in recovery.");
2428 
2429 originated:
2430  instance->my_aru = SEQNO_START_MSG;
2431  instance->my_aru_count = 0;
2432  instance->my_seq_unchanged = 0;
2434  instance->my_install_seq = SEQNO_START_MSG;
2435  instance->last_released = SEQNO_START_MSG;
2436 
2437  reset_token_timeout (instance); // REVIEWED
2438  reset_token_retransmit_timeout (instance); // REVIEWED
2439 
2440  instance->memb_state = MEMB_STATE_RECOVERY;
2441  instance->stats.recovery_entered++;
2442  instance->stats.continuous_gather = 0;
2443 
2444  return;
2445 }
2446 
2447 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2448 {
2449  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2450 
2451  token_hold_cancel_send (instance);
2452 
2453  return;
2454 }
2455 
2457  void *srp_context,
2458  struct iovec *iovec,
2459  unsigned int iov_len,
2460  int guarantee)
2461 {
2462  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2463  int i;
2464  struct message_item message_item;
2465  char *addr;
2466  unsigned int addr_idx;
2467  struct cs_queue *queue_use;
2468 
2469  if (instance->waiting_trans_ack) {
2470  queue_use = &instance->new_message_queue_trans;
2471  } else {
2472  queue_use = &instance->new_message_queue;
2473  }
2474 
2475  if (cs_queue_is_full (queue_use)) {
2476  log_printf (instance->totemsrp_log_level_debug, "queue full");
2477  return (-1);
2478  }
2479 
2480  memset (&message_item, 0, sizeof (struct message_item));
2481 
2482  /*
2483  * Allocate pending item
2484  */
2485  message_item.mcast = totemsrp_buffer_alloc (instance);
2486  if (message_item.mcast == 0) {
2487  goto error_mcast;
2488  }
2489 
2490  /*
2491  * Set mcast header
2492  */
2493  memset(message_item.mcast, 0, sizeof (struct mcast));
2498 
2499  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2500  assert (message_item.mcast->header.nodeid);
2501 
2503  message_item.mcast->system_from = instance->my_id;
2504 
2505  addr = (char *)message_item.mcast;
2506  addr_idx = sizeof (struct mcast);
2507  for (i = 0; i < iov_len; i++) {
2508  memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2509  addr_idx += iovec[i].iov_len;
2510  }
2511 
2512  message_item.msg_len = addr_idx;
2513 
2514  log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2515  instance->stats.mcast_tx++;
2516  cs_queue_item_add (queue_use, &message_item);
2517 
2518  return (0);
2519 
2520 error_mcast:
2521  return (-1);
2522 }
2523 
2524 /*
2525  * Determine if there is room to queue a new message
2526  */
2527 int totemsrp_avail (void *srp_context)
2528 {
2529  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2530  int avail;
2531  struct cs_queue *queue_use;
2532 
2533  if (instance->waiting_trans_ack) {
2534  queue_use = &instance->new_message_queue_trans;
2535  } else {
2536  queue_use = &instance->new_message_queue;
2537  }
2538  cs_queue_avail (queue_use, &avail);
2539 
2540  return (avail);
2541 }
2542 
2543 /*
2544  * ORF Token Management
2545  */
2546 /*
2547  * Recast message to mcast group if it is available
2548  */
2549 static int orf_token_remcast (
2550  struct totemsrp_instance *instance,
2551  int seq)
2552 {
2554  int res;
2555  void *ptr;
2556 
2557  struct sq *sort_queue;
2558 
2559  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2560  sort_queue = &instance->recovery_sort_queue;
2561  } else {
2562  sort_queue = &instance->regular_sort_queue;
2563  }
2564 
2565  res = sq_in_range (sort_queue, seq);
2566  if (res == 0) {
2567  log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2568  return (-1);
2569  }
2570 
2571  /*
2572  * Get RTR item at seq, if not available, return
2573  */
2574  res = sq_item_get (sort_queue, seq, &ptr);
2575  if (res != 0) {
2576  return -1;
2577  }
2578 
2579  sort_queue_item = ptr;
2580 
2582  instance->totemnet_context,
2585 
2586  return (0);
2587 }
2588 
2589 
2590 /*
2591  * Free all freeable messages from ring
2592  */
2593 static void messages_free (
2594  struct totemsrp_instance *instance,
2595  unsigned int token_aru)
2596 {
2597  struct sort_queue_item *regular_message;
2598  unsigned int i;
2599  int res;
2600  int log_release = 0;
2601  unsigned int release_to;
2602  unsigned int range = 0;
2603 
2604  release_to = token_aru;
2605  if (sq_lt_compare (instance->my_last_aru, release_to)) {
2606  release_to = instance->my_last_aru;
2607  }
2608  if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2609  release_to = instance->my_high_delivered;
2610  }
2611 
2612  /*
2613  * Ensure we dont try release before an already released point
2614  */
2615  if (sq_lt_compare (release_to, instance->last_released)) {
2616  return;
2617  }
2618 
2619  range = release_to - instance->last_released;
2620  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2621 
2622  /*
2623  * Release retransmit list items if group aru indicates they are transmitted
2624  */
2625  for (i = 1; i <= range; i++) {
2626  void *ptr;
2627 
2628  res = sq_item_get (&instance->regular_sort_queue,
2629  instance->last_released + i, &ptr);
2630  if (res == 0) {
2631  regular_message = ptr;
2632  totemsrp_buffer_release (instance, regular_message->mcast);
2633  }
2634  sq_items_release (&instance->regular_sort_queue,
2635  instance->last_released + i);
2636 
2637  log_release = 1;
2638  }
2639  instance->last_released += range;
2640 
2641  if (log_release) {
2643  "releasing messages up to and including %x", release_to);
2644  }
2645 }
2646 
2647 static void update_aru (
2648  struct totemsrp_instance *instance)
2649 {
2650  unsigned int i;
2651  int res;
2652  struct sq *sort_queue;
2653  unsigned int range;
2654  unsigned int my_aru_saved = 0;
2655 
2656  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2657  sort_queue = &instance->recovery_sort_queue;
2658  } else {
2659  sort_queue = &instance->regular_sort_queue;
2660  }
2661 
2662  range = instance->my_high_seq_received - instance->my_aru;
2663 
2664  my_aru_saved = instance->my_aru;
2665  for (i = 1; i <= range; i++) {
2666 
2667  void *ptr;
2668 
2669  res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2670  /*
2671  * If hole, stop updating aru
2672  */
2673  if (res != 0) {
2674  break;
2675  }
2676  }
2677  instance->my_aru += i - 1;
2678 }
2679 
2680 /*
2681  * Multicasts pending messages onto the ring (requires orf_token possession)
2682  */
2683 static int orf_token_mcast (
2684  struct totemsrp_instance *instance,
2685  struct orf_token *token,
2686  int fcc_mcasts_allowed)
2687 {
2688  struct message_item *message_item = 0;
2689  struct cs_queue *mcast_queue;
2690  struct sq *sort_queue;
2692  struct mcast *mcast;
2693  unsigned int fcc_mcast_current;
2694 
2695  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2696  mcast_queue = &instance->retrans_message_queue;
2697  sort_queue = &instance->recovery_sort_queue;
2698  reset_token_retransmit_timeout (instance); // REVIEWED
2699  } else {
2700  if (instance->waiting_trans_ack) {
2701  mcast_queue = &instance->new_message_queue_trans;
2702  } else {
2703  mcast_queue = &instance->new_message_queue;
2704  }
2705 
2706  sort_queue = &instance->regular_sort_queue;
2707  }
2708 
2709  for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2710  if (cs_queue_is_empty (mcast_queue)) {
2711  break;
2712  }
2713  message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2714 
2715  message_item->mcast->seq = ++token->seq;
2716  message_item->mcast->this_seqno = instance->global_seqno++;
2717 
2718  /*
2719  * Build IO vector
2720  */
2721  memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2724 
2726 
2727  memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2728 
2729  /*
2730  * Add message to retransmit queue
2731  */
2732  sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2733 
2735  instance->totemnet_context,
2738 
2739  /*
2740  * Delete item from pending queue
2741  */
2742  cs_queue_item_remove (mcast_queue);
2743 
2744  /*
2745  * If messages mcasted, deliver any new messages to totempg
2746  */
2747  instance->my_high_seq_received = token->seq;
2748  }
2749 
2750  update_aru (instance);
2751 
2752  /*
2753  * Return 1 if more messages are available for single node clusters
2754  */
2755  return (fcc_mcast_current);
2756 }
2757 
2758 /*
2759  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2760  * Modify's orf_token's rtr to include retransmits required by this process
2761  */
2762 static int orf_token_rtr (
2763  struct totemsrp_instance *instance,
2764  struct orf_token *orf_token,
2765  unsigned int *fcc_allowed)
2766 {
2767  unsigned int res;
2768  unsigned int i, j;
2769  unsigned int found;
2770  struct sq *sort_queue;
2771  struct rtr_item *rtr_list;
2772  unsigned int range = 0;
2773  char retransmit_msg[1024];
2774  char value[64];
2775 
2776  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2777  sort_queue = &instance->recovery_sort_queue;
2778  } else {
2779  sort_queue = &instance->regular_sort_queue;
2780  }
2781 
2782  rtr_list = &orf_token->rtr_list[0];
2783 
2784  strcpy (retransmit_msg, "Retransmit List: ");
2785  if (orf_token->rtr_list_entries) {
2787  "Retransmit List %d", orf_token->rtr_list_entries);
2788  for (i = 0; i < orf_token->rtr_list_entries; i++) {
2789  sprintf (value, "%x ", rtr_list[i].seq);
2790  strcat (retransmit_msg, value);
2791  }
2792  strcat (retransmit_msg, "");
2794  "%s", retransmit_msg);
2795  }
2796 
2797  /*
2798  * Retransmit messages on orf_token's RTR list from RTR queue
2799  */
2800  for (instance->fcc_remcast_current = 0, i = 0;
2801  instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2802 
2803  /*
2804  * If this retransmit request isn't from this configuration,
2805  * try next rtr entry
2806  */
2807  if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2808  sizeof (struct memb_ring_id)) != 0) {
2809 
2810  i += 1;
2811  continue;
2812  }
2813 
2814  res = orf_token_remcast (instance, rtr_list[i].seq);
2815  if (res == 0) {
2816  /*
2817  * Multicasted message, so no need to copy to new retransmit list
2818  */
2820  assert (orf_token->rtr_list_entries >= 0);
2821  memmove (&rtr_list[i], &rtr_list[i + 1],
2822  sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2823 
2824  instance->stats.mcast_retx++;
2825  instance->fcc_remcast_current++;
2826  } else {
2827  i += 1;
2828  }
2829  }
2830  *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2831 
2832  /*
2833  * Add messages to retransmit to RTR list
2834  * but only retry if there is room in the retransmit list
2835  */
2836 
2837  range = orf_token->seq - instance->my_aru;
2838  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2839 
2841  (i <= range); i++) {
2842 
2843  /*
2844  * Ensure message is within the sort queue range
2845  */
2846  res = sq_in_range (sort_queue, instance->my_aru + i);
2847  if (res == 0) {
2848  break;
2849  }
2850 
2851  /*
2852  * Find if a message is missing from this processor
2853  */
2854  res = sq_item_inuse (sort_queue, instance->my_aru + i);
2855  if (res == 0) {
2856  /*
2857  * Determine how many times we have missed receiving
2858  * this sequence number. sq_item_miss_count increments
2859  * a counter for the sequence number. The miss count
2860  * will be returned and compared. This allows time for
2861  * delayed multicast messages to be received before
2862  * declaring the message is missing and requesting a
2863  * retransmit.
2864  */
2865  res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2866  if (res < instance->totem_config->miss_count_const) {
2867  continue;
2868  }
2869 
2870  /*
2871  * Determine if missing message is already in retransmit list
2872  */
2873  found = 0;
2874  for (j = 0; j < orf_token->rtr_list_entries; j++) {
2875  if (instance->my_aru + i == rtr_list[j].seq) {
2876  found = 1;
2877  }
2878  }
2879  if (found == 0) {
2880  /*
2881  * Missing message not found in current retransmit list so add it
2882  */
2884  &instance->my_ring_id, sizeof (struct memb_ring_id));
2885  rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2887  }
2888  }
2889  }
2890  return (instance->fcc_remcast_current);
2891 }
2892 
2893 static void token_retransmit (struct totemsrp_instance *instance)
2894 {
2896  instance->orf_token_retransmit,
2897  instance->orf_token_retransmit_size);
2898 }
2899 
2900 /*
2901  * Retransmit the regular token if no mcast or token has
2902  * been received in retransmit token period retransmit
2903  * the token to the next processor
2904  */
2905 static void timer_function_token_retransmit_timeout (void *data)
2906 {
2907  struct totemsrp_instance *instance = data;
2908 
2909  switch (instance->memb_state) {
2910  case MEMB_STATE_GATHER:
2911  break;
2912  case MEMB_STATE_COMMIT:
2914  case MEMB_STATE_RECOVERY:
2915  token_retransmit (instance);
2916  reset_token_retransmit_timeout (instance); // REVIEWED
2917  break;
2918  }
2919 }
2920 
2921 static void timer_function_token_hold_retransmit_timeout (void *data)
2922 {
2923  struct totemsrp_instance *instance = data;
2924 
2925  switch (instance->memb_state) {
2926  case MEMB_STATE_GATHER:
2927  break;
2928  case MEMB_STATE_COMMIT:
2929  break;
2931  case MEMB_STATE_RECOVERY:
2932  token_retransmit (instance);
2933  break;
2934  }
2935 }
2936 
2937 static void timer_function_merge_detect_timeout(void *data)
2938 {
2939  struct totemsrp_instance *instance = data;
2940 
2942 
2943  switch (instance->memb_state) {
2945  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2946  memb_merge_detect_transmit (instance);
2947  }
2948  break;
2949  case MEMB_STATE_GATHER:
2950  case MEMB_STATE_COMMIT:
2951  case MEMB_STATE_RECOVERY:
2952  break;
2953  }
2954 }
2955 
2956 /*
2957  * Send orf_token to next member (requires orf_token)
2958  */
2959 static int token_send (
2960  struct totemsrp_instance *instance,
2961  struct orf_token *orf_token,
2962  int forward_token)
2963 {
2964  int res = 0;
2965  unsigned int orf_token_size;
2966 
2967  orf_token_size = sizeof (struct orf_token) +
2968  (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2969 
2970  orf_token->header.nodeid = instance->my_id.nodeid;
2971  memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
2972  instance->orf_token_retransmit_size = orf_token_size;
2973  assert (orf_token->header.nodeid);
2974 
2975  if (forward_token == 0) {
2976  return (0);
2977  }
2978 
2980  orf_token,
2981  orf_token_size);
2982 
2983  return (res);
2984 }
2985 
2986 static int token_hold_cancel_send (struct totemsrp_instance *instance)
2987 {
2989 
2990  /*
2991  * Only cancel if the token is currently held
2992  */
2993  if (instance->my_token_held == 0) {
2994  return (0);
2995  }
2996  instance->my_token_held = 0;
2997 
2998  /*
2999  * Build message
3000  */
3006  memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3007  sizeof (struct memb_ring_id));
3008  assert (token_hold_cancel.header.nodeid);
3009 
3010  instance->stats.token_hold_cancel_tx++;
3011 
3013  sizeof (struct token_hold_cancel));
3014 
3015  return (0);
3016 }
3017 
3018 static int orf_token_send_initial (struct totemsrp_instance *instance)
3019 {
3020  struct orf_token orf_token;
3021  int res;
3022 
3027  orf_token.header.nodeid = instance->my_id.nodeid;
3028  assert (orf_token.header.nodeid);
3031  orf_token.retrans_flg = 1;
3032  instance->my_set_retrans_flg = 1;
3033  instance->stats.orf_token_tx++;
3034 
3035  if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3036  orf_token.retrans_flg = 0;
3037  instance->my_set_retrans_flg = 0;
3038  } else {
3039  orf_token.retrans_flg = 1;
3040  instance->my_set_retrans_flg = 1;
3041  }
3042 
3043  orf_token.aru = 0;
3045  orf_token.aru_addr = instance->my_id.nodeid;
3046 
3047  memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3048  orf_token.fcc = 0;
3049  orf_token.backlog = 0;
3050 
3052 
3053  res = token_send (instance, &orf_token, 1);
3054 
3055  return (res);
3056 }
3057 
3058 static void memb_state_commit_token_update (
3059  struct totemsrp_instance *instance)
3060 {
3061  struct srp_addr *addr;
3062  struct memb_commit_token_memb_entry *memb_list;
3063  unsigned int high_aru;
3064  unsigned int i;
3065 
3066  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3067  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3068 
3069  memcpy (instance->my_new_memb_list, addr,
3070  sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3071 
3072  instance->my_new_memb_entries = instance->commit_token->addr_entries;
3073 
3074  memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3075  &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3076 
3077  memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3078  /*
3079  * TODO high delivered is really instance->my_aru, but with safe this
3080  * could change?
3081  */
3082  instance->my_received_flg =
3083  (instance->my_aru == instance->my_high_seq_received);
3084 
3085  memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3086 
3087  memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3088  /*
3089  * find high aru up to current memb_index for all matching ring ids
3090  * if any ring id matching memb_index has aru less then high aru set
3091  * received flag for that entry to false
3092  */
3093  high_aru = memb_list[instance->commit_token->memb_index].aru;
3094  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3095  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3096  &memb_list[i].ring_id,
3097  sizeof (struct memb_ring_id)) == 0) {
3098 
3099  if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3100  high_aru = memb_list[i].aru;
3101  }
3102  }
3103  }
3104 
3105  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3106  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3107  &memb_list[i].ring_id,
3108  sizeof (struct memb_ring_id)) == 0) {
3109 
3110  if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3111  memb_list[i].received_flg = 0;
3112  if (i == instance->commit_token->memb_index) {
3113  instance->my_received_flg = 0;
3114  }
3115  }
3116  }
3117  }
3118 
3119  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3120  instance->commit_token->memb_index += 1;
3121  assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3122  assert (instance->commit_token->header.nodeid);
3123 }
3124 
3125 static void memb_state_commit_token_target_set (
3126  struct totemsrp_instance *instance)
3127 {
3128  struct srp_addr *addr;
3129 
3130  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3131 
3132  /* Totemnet just looks at the node id */
3134  instance->totemnet_context,
3135  addr[instance->commit_token->memb_index %
3136  instance->commit_token->addr_entries].nodeid);
3137 }
3138 
3139 static int memb_state_commit_token_send_recovery (
3140  struct totemsrp_instance *instance,
3141  struct memb_commit_token *commit_token)
3142 {
3143  unsigned int commit_token_size;
3144 
3145  commit_token->token_seq++;
3146  commit_token->header.nodeid = instance->my_id.nodeid;
3147  commit_token_size = sizeof (struct memb_commit_token) +
3148  ((sizeof (struct srp_addr) +
3149  sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3150  /*
3151  * Make a copy for retransmission if necessary
3152  */
3153  memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3154  instance->orf_token_retransmit_size = commit_token_size;
3155 
3156  instance->stats.memb_commit_token_tx++;
3157 
3159  commit_token,
3160  commit_token_size);
3161 
3162  /*
3163  * Request retransmission of the commit token in case it is lost
3164  */
3165  reset_token_retransmit_timeout (instance);
3166  return (0);
3167 }
3168 
3169 static int memb_state_commit_token_send (
3170  struct totemsrp_instance *instance)
3171 {
3172  unsigned int commit_token_size;
3173 
3174  instance->commit_token->token_seq++;
3175  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3176  commit_token_size = sizeof (struct memb_commit_token) +
3177  ((sizeof (struct srp_addr) +
3178  sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3179  /*
3180  * Make a copy for retransmission if necessary
3181  */
3182  memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3183  instance->orf_token_retransmit_size = commit_token_size;
3184 
3185  instance->stats.memb_commit_token_tx++;
3186 
3188  instance->commit_token,
3189  commit_token_size);
3190 
3191  /*
3192  * Request retransmission of the commit token in case it is lost
3193  */
3194  reset_token_retransmit_timeout (instance);
3195  return (0);
3196 }
3197 
3198 
3199 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3200 {
3201  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3202  int token_memb_entries = 0;
3203  int i;
3204  unsigned int lowest_nodeid;
3205 
3206  memb_set_subtract (token_memb, &token_memb_entries,
3207  instance->my_proc_list, instance->my_proc_list_entries,
3208  instance->my_failed_list, instance->my_failed_list_entries);
3209 
3210  /*
3211  * find representative by searching for smallest identifier
3212  */
3213  assert(token_memb_entries > 0);
3214 
3215  lowest_nodeid = token_memb[0].nodeid;
3216  for (i = 1; i < token_memb_entries; i++) {
3217  if (lowest_nodeid > token_memb[i].nodeid) {
3218  lowest_nodeid = token_memb[i].nodeid;
3219  }
3220  }
3221  return (lowest_nodeid == instance->my_id.nodeid);
3222 }
3223 
3224 static int srp_addr_compare (const void *a, const void *b)
3225 {
3226  const struct srp_addr *srp_a = (const struct srp_addr *)a;
3227  const struct srp_addr *srp_b = (const struct srp_addr *)b;
3228 
3229  if (srp_a->nodeid < srp_b->nodeid) {
3230  return -1;
3231  } else if (srp_a->nodeid > srp_b->nodeid) {
3232  return 1;
3233  } else {
3234  return 0;
3235  }
3236 }
3237 
3238 static void memb_state_commit_token_create (
3239  struct totemsrp_instance *instance)
3240 {
3241  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3242  struct srp_addr *addr;
3243  struct memb_commit_token_memb_entry *memb_list;
3244  int token_memb_entries = 0;
3245 
3247  "Creating commit token because I am the rep.");
3248 
3249  memb_set_subtract (token_memb, &token_memb_entries,
3250  instance->my_proc_list, instance->my_proc_list_entries,
3251  instance->my_failed_list, instance->my_failed_list_entries);
3252 
3253  memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3254  instance->commit_token->header.magic = TOTEM_MH_MAGIC;
3257  instance->commit_token->header.encapsulated = 0;
3258  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3259  assert (instance->commit_token->header.nodeid);
3260 
3261  instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3262  instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3263 
3264  /*
3265  * This qsort is necessary to ensure the commit token traverses
3266  * the ring in the proper order
3267  */
3268  qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3269  srp_addr_compare);
3270 
3271  instance->commit_token->memb_index = 0;
3272  instance->commit_token->addr_entries = token_memb_entries;
3273 
3274  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3275  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3276 
3277  memcpy (addr, token_memb,
3278  token_memb_entries * sizeof (struct srp_addr));
3279  memset (memb_list, 0,
3280  sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3281 }
3282 
3283 static void memb_join_message_send (struct totemsrp_instance *instance)
3284 {
3285  char memb_join_data[40000];
3286  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3287  char *addr;
3288  unsigned int addr_idx;
3289  size_t msg_len;
3290 
3295  memb_join->header.nodeid = instance->my_id.nodeid;
3296  assert (memb_join->header.nodeid);
3297 
3298  msg_len = sizeof(struct memb_join) +
3299  ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3300 
3301  if (msg_len > sizeof(memb_join_data)) {
3303  "memb_join_message too long. Ignoring message.");
3304 
3305  return ;
3306  }
3307 
3308  memb_join->ring_seq = instance->my_ring_id.seq;
3311  memb_join->system_from = instance->my_id;
3312 
3313  /*
3314  * This mess adds the joined and failed processor lists into the join
3315  * message
3316  */
3317  addr = (char *)memb_join;
3318  addr_idx = sizeof (struct memb_join);
3319  memcpy (&addr[addr_idx],
3320  instance->my_proc_list,
3321  instance->my_proc_list_entries *
3322  sizeof (struct srp_addr));
3323  addr_idx +=
3324  instance->my_proc_list_entries *
3325  sizeof (struct srp_addr);
3326  memcpy (&addr[addr_idx],
3327  instance->my_failed_list,
3328  instance->my_failed_list_entries *
3329  sizeof (struct srp_addr));
3330  addr_idx +=
3331  instance->my_failed_list_entries *
3332  sizeof (struct srp_addr);
3333 
3334  if (instance->totem_config->send_join_timeout) {
3335  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3336  }
3337 
3338  instance->stats.memb_join_tx++;
3339 
3341  instance->totemnet_context,
3342  memb_join,
3343  addr_idx);
3344 }
3345 
3346 static void memb_leave_message_send (struct totemsrp_instance *instance)
3347 {
3348  char memb_join_data[40000];
3349  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3350  char *addr;
3351  unsigned int addr_idx;
3352  int active_memb_entries;
3353  struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3354  size_t msg_len;
3355 
3357  "sending join/leave message");
3358 
3359  /*
3360  * add us to the failed list, and remove us from
3361  * the members list
3362  */
3363  memb_set_merge(
3364  &instance->my_id, 1,
3365  instance->my_failed_list, &instance->my_failed_list_entries);
3366 
3367  memb_set_subtract (active_memb, &active_memb_entries,
3368  instance->my_proc_list, instance->my_proc_list_entries,
3369  &instance->my_id, 1);
3370 
3371  msg_len = sizeof(struct memb_join) +
3372  ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3373 
3374  if (msg_len > sizeof(memb_join_data)) {
3376  "memb_leave message too long. Ignoring message.");
3377 
3378  return ;
3379  }
3380 
3386 
3387  memb_join->ring_seq = instance->my_ring_id.seq;
3388  memb_join->proc_list_entries = active_memb_entries;
3390  memb_join->system_from = instance->my_id;
3391 
3392  // TODO: CC Maybe use the actual join send routine.
3393  /*
3394  * This mess adds the joined and failed processor lists into the join
3395  * message
3396  */
3397  addr = (char *)memb_join;
3398  addr_idx = sizeof (struct memb_join);
3399  memcpy (&addr[addr_idx],
3400  active_memb,
3401  active_memb_entries *
3402  sizeof (struct srp_addr));
3403  addr_idx +=
3404  active_memb_entries *
3405  sizeof (struct srp_addr);
3406  memcpy (&addr[addr_idx],
3407  instance->my_failed_list,
3408  instance->my_failed_list_entries *
3409  sizeof (struct srp_addr));
3410  addr_idx +=
3411  instance->my_failed_list_entries *
3412  sizeof (struct srp_addr);
3413 
3414 
3415  if (instance->totem_config->send_join_timeout) {
3416  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3417  }
3418  instance->stats.memb_join_tx++;
3419 
3421  instance->totemnet_context,
3422  memb_join,
3423  addr_idx);
3424 }
3425 
3426 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3427 {
3429 
3435  memb_merge_detect.system_from = instance->my_id;
3436  memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3437  sizeof (struct memb_ring_id));
3438  assert (memb_merge_detect.header.nodeid);
3439 
3440  instance->stats.memb_merge_detect_tx++;
3443  sizeof (struct memb_merge_detect));
3444 }
3445 
3446 static void memb_ring_id_set (
3447  struct totemsrp_instance *instance,
3448  const struct memb_ring_id *ring_id)
3449 {
3450 
3451  memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3452 }
3453 
3455  void *srp_context,
3456  void **handle_out,
3458  int delete,
3459  int (*callback_fn) (enum totem_callback_token_type type, const void *),
3460  const void *data)
3461 {
3462  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3463  struct token_callback_instance *callback_handle;
3464 
3465  token_hold_cancel_send (instance);
3466 
3467  callback_handle = malloc (sizeof (struct token_callback_instance));
3468  if (callback_handle == 0) {
3469  return (-1);
3470  }
3471  *handle_out = (void *)callback_handle;
3472  qb_list_init (&callback_handle->list);
3473  callback_handle->callback_fn = callback_fn;
3474  callback_handle->data = (void *) data;
3475  callback_handle->callback_type = type;
3476  callback_handle->delete = delete;
3477  switch (type) {
3479  qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3480  break;
3482  qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3483  break;
3484  }
3485 
3486  return (0);
3487 }
3488 
3489 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3490 {
3491  struct token_callback_instance *h;
3492 
3493  if (*handle_out) {
3494  h = (struct token_callback_instance *)*handle_out;
3495  qb_list_del (&h->list);
3496  free (h);
3497  h = NULL;
3498  *handle_out = 0;
3499  }
3500 }
3501 
3502 static void token_callbacks_execute (
3503  struct totemsrp_instance *instance,
3505 {
3506  struct qb_list_head *list, *tmp_iter;
3507  struct qb_list_head *callback_listhead = 0;
3509  int res;
3510  int del;
3511 
3512  switch (type) {
3514  callback_listhead = &instance->token_callback_received_listhead;
3515  break;
3517  callback_listhead = &instance->token_callback_sent_listhead;
3518  break;
3519  default:
3520  assert (0);
3521  }
3522 
3523  qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3524  token_callback_instance = qb_list_entry (list, struct token_callback_instance, list);
3526  if (del == 1) {
3527  qb_list_del (list);
3528  }
3529 
3533  /*
3534  * This callback failed to execute, try it again on the next token
3535  */
3536  if (res == -1 && del == 1) {
3537  qb_list_add (list, callback_listhead);
3538  } else if (del) {
3539  free (token_callback_instance);
3540  }
3541  }
3542 }
3543 
3544 /*
3545  * Flow control functions
3546  */
3547 static unsigned int backlog_get (struct totemsrp_instance *instance)
3548 {
3549  unsigned int backlog = 0;
3550  struct cs_queue *queue_use = NULL;
3551 
3552  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3553  if (instance->waiting_trans_ack) {
3554  queue_use = &instance->new_message_queue_trans;
3555  } else {
3556  queue_use = &instance->new_message_queue;
3557  }
3558  } else
3559  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3560  queue_use = &instance->retrans_message_queue;
3561  }
3562 
3563  if (queue_use != NULL) {
3564  backlog = cs_queue_used (queue_use);
3565  }
3566 
3567  instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3568  return (backlog);
3569 }
3570 
3571 static int fcc_calculate (
3572  struct totemsrp_instance *instance,
3573  struct orf_token *token)
3574 {
3575  unsigned int transmits_allowed;
3576  unsigned int backlog_calc;
3577 
3578  transmits_allowed = instance->totem_config->max_messages;
3579 
3580  if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3581  transmits_allowed = instance->totem_config->window_size - token->fcc;
3582  }
3583 
3584  instance->my_cbl = backlog_get (instance);
3585 
3586  /*
3587  * Only do backlog calculation if there is a backlog otherwise
3588  * we would result in div by zero
3589  */
3590  if (token->backlog + instance->my_cbl - instance->my_pbl) {
3591  backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3592  (token->backlog + instance->my_cbl - instance->my_pbl);
3593  if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3594  transmits_allowed = backlog_calc;
3595  }
3596  }
3597 
3598  return (transmits_allowed);
3599 }
3600 
3601 /*
3602  * don't overflow the RTR sort queue
3603  */
3604 static void fcc_rtr_limit (
3605  struct totemsrp_instance *instance,
3606  struct orf_token *token,
3607  unsigned int *transmits_allowed)
3608 {
3609  int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3610  check -= (*transmits_allowed + instance->totem_config->window_size);
3611  assert (check >= 0);
3612  if (sq_lt_compare (instance->last_released +
3613  QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3614  instance->totem_config->window_size,
3615 
3616  token->seq)) {
3617 
3618  *transmits_allowed = 0;
3619  }
3620 }
3621 
3622 static void fcc_token_update (
3623  struct totemsrp_instance *instance,
3624  struct orf_token *token,
3625  unsigned int msgs_transmitted)
3626 {
3627  token->fcc += msgs_transmitted - instance->my_trc;
3628  token->backlog += instance->my_cbl - instance->my_pbl;
3629  instance->my_trc = msgs_transmitted;
3630  instance->my_pbl = instance->my_cbl;
3631 }
3632 
3633 /*
3634  * Sanity checkers
3635  */
3636 static int check_orf_token_sanity(
3637  const struct totemsrp_instance *instance,
3638  const void *msg,
3639  size_t msg_len,
3640  size_t max_msg_len,
3641  int endian_conversion_needed)
3642 {
3643  int rtr_entries;
3644  const struct orf_token *token = (const struct orf_token *)msg;
3645  size_t required_len;
3646 
3647  if (msg_len > max_msg_len) {
3649  "Received orf_token message is too long... ignoring.");
3650 
3651  return (-1);
3652  }
3653 
3654  if (msg_len < sizeof(struct orf_token)) {
3656  "Received orf_token message is too short... ignoring.");
3657 
3658  return (-1);
3659  }
3660 
3661  if (endian_conversion_needed) {
3662  rtr_entries = swab32(token->rtr_list_entries);
3663  } else {
3664  rtr_entries = token->rtr_list_entries;
3665  }
3666 
3667  if (rtr_entries > RETRANSMIT_ENTRIES_MAX) {
3669  "Received orf_token message rtr_entries is corrupted... ignoring.");
3670 
3671  return (-1);
3672  }
3673 
3674  required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3675  if (msg_len < required_len) {
3677  "Received orf_token message is too short... ignoring.");
3678 
3679  return (-1);
3680  }
3681 
3682  return (0);
3683 }
3684 
3685 static int check_mcast_sanity(
3686  struct totemsrp_instance *instance,
3687  const void *msg,
3688  size_t msg_len,
3689  int endian_conversion_needed)
3690 {
3691 
3692  if (msg_len < sizeof(struct mcast)) {
3694  "Received mcast message is too short... ignoring.");
3695 
3696  return (-1);
3697  }
3698 
3699  return (0);
3700 }
3701 
3702 static int check_memb_merge_detect_sanity(
3703  struct totemsrp_instance *instance,
3704  const void *msg,
3705  size_t msg_len,
3706  int endian_conversion_needed)
3707 {
3708 
3709  if (msg_len < sizeof(struct memb_merge_detect)) {
3711  "Received memb_merge_detect message is too short... ignoring.");
3712 
3713  return (-1);
3714  }
3715 
3716  return (0);
3717 }
3718 
3719 static int check_memb_join_sanity(
3720  struct totemsrp_instance *instance,
3721  const void *msg,
3722  size_t msg_len,
3723  int endian_conversion_needed)
3724 {
3725  const struct memb_join *mj_msg = (const struct memb_join *)msg;
3726  unsigned int proc_list_entries;
3727  unsigned int failed_list_entries;
3728  size_t required_len;
3729 
3730  if (msg_len < sizeof(struct memb_join)) {
3732  "Received memb_join message is too short... ignoring.");
3733 
3734  return (-1);
3735  }
3736 
3739 
3740  if (endian_conversion_needed) {
3743  }
3744 
3745  required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3746  if (msg_len < required_len) {
3748  "Received memb_join message is too short... ignoring.");
3749 
3750  return (-1);
3751  }
3752 
3753  return (0);
3754 }
3755 
3756 static int check_memb_commit_token_sanity(
3757  struct totemsrp_instance *instance,
3758  const void *msg,
3759  size_t msg_len,
3760  int endian_conversion_needed)
3761 {
3762  const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3763  unsigned int addr_entries;
3764  size_t required_len;
3765 
3766  if (msg_len < sizeof(struct memb_commit_token)) {
3768  "Received memb_commit_token message is too short... ignoring.");
3769 
3770  return (0);
3771  }
3772 
3773  addr_entries= mct_msg->addr_entries;
3774  if (endian_conversion_needed) {
3776  }
3777 
3778  required_len = sizeof(struct memb_commit_token) +
3779  (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3780  if (msg_len < required_len) {
3782  "Received memb_commit_token message is too short... ignoring.");
3783 
3784  return (-1);
3785  }
3786 
3787  return (0);
3788 }
3789 
3790 static int check_token_hold_cancel_sanity(
3791  struct totemsrp_instance *instance,
3792  const void *msg,
3793  size_t msg_len,
3794  int endian_conversion_needed)
3795 {
3796 
3797  if (msg_len < sizeof(struct token_hold_cancel)) {
3799  "Received token_hold_cancel message is too short... ignoring.");
3800 
3801  return (-1);
3802  }
3803 
3804  return (0);
3805 }
3806 
3807 /*
3808  * Message Handlers
3809  */
3810 
3811 unsigned long long int tv_old;
3812 /*
3813  * message handler called when TOKEN message type received
3814  */
3815 static int message_handler_orf_token (
3816  struct totemsrp_instance *instance,
3817  const void *msg,
3818  size_t msg_len,
3819  int endian_conversion_needed)
3820 {
3821  char token_storage[1500];
3822  char token_convert[1500];
3823  struct orf_token *token = NULL;
3824  int forward_token;
3825  unsigned int transmits_allowed;
3826  unsigned int mcasted_retransmit;
3827  unsigned int mcasted_regular;
3828  unsigned int last_aru;
3829 
3830 #ifdef GIVEINFO
3831  unsigned long long tv_current;
3832  unsigned long long tv_diff;
3833 
3834  tv_current = qb_util_nano_current_get ();
3835  tv_diff = tv_current - tv_old;
3836  tv_old = tv_current;
3837 
3839  "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3840 #endif
3841 
3842  if (check_orf_token_sanity(instance, msg, msg_len, sizeof(token_storage),
3843  endian_conversion_needed) == -1) {
3844  return (0);
3845  }
3846 
3847  if (instance->orf_token_discard) {
3848  return (0);
3849  }
3850 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3851  if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3852  return (0);
3853  }
3854 #endif
3855 
3856  if (endian_conversion_needed) {
3857  orf_token_endian_convert ((struct orf_token *)msg,
3858  (struct orf_token *)token_convert);
3859  msg = (struct orf_token *)token_convert;
3860  }
3861 
3862  /*
3863  * Make copy of token and retransmit list in case we have
3864  * to flush incoming messages from the kernel queue
3865  */
3866  token = (struct orf_token *)token_storage;
3867  memcpy (token, msg, sizeof (struct orf_token));
3868  memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3869  sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3870 
3871 
3872  /*
3873  * Handle merge detection timeout
3874  */
3875  if (token->seq == instance->my_last_seq) {
3876  start_merge_detect_timeout (instance);
3877  instance->my_seq_unchanged += 1;
3878  } else {
3879  cancel_merge_detect_timeout (instance);
3880  cancel_token_hold_retransmit_timeout (instance);
3881  instance->my_seq_unchanged = 0;
3882  }
3883 
3884  instance->my_last_seq = token->seq;
3885 
3886 #ifdef TEST_RECOVERY_MSG_COUNT
3887  if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3888  return (0);
3889  }
3890 #endif
3891  instance->flushing = 1;
3893  instance->flushing = 0;
3894 
3895  /*
3896  * Determine if we should hold (in reality drop) the token
3897  */
3898  instance->my_token_held = 0;
3899  if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3900  instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3901  instance->my_token_held = 1;
3902  } else {
3903  if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3904  instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3905  instance->my_token_held = 1;
3906  }
3907  }
3908 
3909  /*
3910  * Hold onto token when there is no activity on ring and
3911  * this processor is the ring rep
3912  */
3913  forward_token = 1;
3914  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3915  if (instance->my_token_held) {
3916  forward_token = 0;
3917  }
3918  }
3919 
3920  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3921 
3922  switch (instance->memb_state) {
3923  case MEMB_STATE_COMMIT:
3924  /* Discard token */
3925  break;
3926 
3928  messages_free (instance, token->aru);
3929  /*
3930  * Do NOT add break, this case should also execute code in gather case.
3931  */
3932 
3933  case MEMB_STATE_GATHER:
3934  /*
3935  * DO NOT add break, we use different free mechanism in recovery state
3936  */
3937 
3938  case MEMB_STATE_RECOVERY:
3939  /*
3940  * Discard tokens from another configuration
3941  */
3942  if (memcmp (&token->ring_id, &instance->my_ring_id,
3943  sizeof (struct memb_ring_id)) != 0) {
3944 
3945  if ((forward_token)
3946  && instance->use_heartbeat) {
3947  reset_heartbeat_timeout(instance);
3948  }
3949  else {
3950  cancel_heartbeat_timeout(instance);
3951  }
3952 
3953  return (0); /* discard token */
3954  }
3955 
3956  /*
3957  * Discard retransmitted tokens
3958  */
3959  if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3960  return (0); /* discard token */
3961  }
3962  last_aru = instance->my_last_aru;
3963  instance->my_last_aru = token->aru;
3964 
3965  transmits_allowed = fcc_calculate (instance, token);
3966  mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3967 
3968  if (instance->my_token_held == 1 &&
3969  (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
3970  instance->my_token_held = 0;
3971  forward_token = 1;
3972  }
3973 
3974  fcc_rtr_limit (instance, token, &transmits_allowed);
3975  mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3976 /*
3977 if (mcasted_regular) {
3978 printf ("mcasted regular %d\n", mcasted_regular);
3979 printf ("token seq %d\n", token->seq);
3980 }
3981 */
3982  fcc_token_update (instance, token, mcasted_retransmit +
3983  mcasted_regular);
3984 
3985  if (sq_lt_compare (instance->my_aru, token->aru) ||
3986  instance->my_id.nodeid == token->aru_addr ||
3987  token->aru_addr == 0) {
3988 
3989  token->aru = instance->my_aru;
3990  if (token->aru == token->seq) {
3991  token->aru_addr = 0;
3992  } else {
3993  token->aru_addr = instance->my_id.nodeid;
3994  }
3995  }
3996  if (token->aru == last_aru && token->aru_addr != 0) {
3997  instance->my_aru_count += 1;
3998  } else {
3999  instance->my_aru_count = 0;
4000  }
4001 
4002  /*
4003  * We really don't follow specification there. In specification, OTHER nodes
4004  * detect failure of one node (based on aru_count) and my_id IS NEVER added
4005  * to failed list (so node never mark itself as failed)
4006  */
4007  if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4008  token->aru_addr == instance->my_id.nodeid) {
4009 
4011  "FAILED TO RECEIVE");
4012 
4013  instance->failed_to_recv = 1;
4014 
4015  memb_set_merge (&instance->my_id, 1,
4016  instance->my_failed_list,
4017  &instance->my_failed_list_entries);
4018 
4019  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4020  } else {
4021  instance->my_token_seq = token->token_seq;
4022  token->token_seq += 1;
4023 
4024  if (instance->memb_state == MEMB_STATE_RECOVERY) {
4025  /*
4026  * instance->my_aru == instance->my_high_seq_received means this processor
4027  * has recovered all messages it can recover
4028  * (ie: its retrans queue is empty)
4029  */
4030  if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4031 
4032  if (token->retrans_flg == 0) {
4033  token->retrans_flg = 1;
4034  instance->my_set_retrans_flg = 1;
4035  }
4036  } else
4037  if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4038  token->retrans_flg = 0;
4039  instance->my_set_retrans_flg = 0;
4040  }
4042  "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4043  token->retrans_flg, instance->my_set_retrans_flg,
4044  cs_queue_is_empty (&instance->retrans_message_queue),
4045  instance->my_retrans_flg_count, token->aru);
4046  if (token->retrans_flg == 0) {
4047  instance->my_retrans_flg_count += 1;
4048  } else {
4049  instance->my_retrans_flg_count = 0;
4050  }
4051  if (instance->my_retrans_flg_count == 2) {
4052  instance->my_install_seq = token->seq;
4053  }
4055  "install seq %x aru %x high seq received %x",
4056  instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4057  if (instance->my_retrans_flg_count >= 2 &&
4058  instance->my_received_flg == 0 &&
4059  sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4060  instance->my_received_flg = 1;
4061  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4062  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4063  sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4064  }
4065  if (instance->my_retrans_flg_count >= 3 &&
4066  sq_lte_compare (instance->my_install_seq, token->aru)) {
4067  instance->my_rotation_counter += 1;
4068  } else {
4069  instance->my_rotation_counter = 0;
4070  }
4071  if (instance->my_rotation_counter == 2) {
4073  "retrans flag count %x token aru %x install seq %x aru %x %x",
4074  instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4075  instance->my_aru, token->seq);
4076 
4077  memb_state_operational_enter (instance);
4078  instance->my_rotation_counter = 0;
4079  instance->my_retrans_flg_count = 0;
4080  }
4081  }
4082 
4084  token_send (instance, token, forward_token);
4085 
4086 #ifdef GIVEINFO
4087  tv_current = qb_util_nano_current_get ();
4088  tv_diff = tv_current - tv_old;
4089  tv_old = tv_current;
4091  "I held %0.4f ms",
4092  ((float)tv_diff) / 1000000.0);
4093 #endif
4094  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4095  messages_deliver_to_app (instance, 0,
4096  instance->my_high_seq_received);
4097  }
4098 
4099  /*
4100  * Deliver messages after token has been transmitted
4101  * to improve performance
4102  */
4103  reset_token_timeout (instance); // REVIEWED
4104  reset_token_retransmit_timeout (instance); // REVIEWED
4105  if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4106  instance->my_token_held == 1) {
4107 
4108  start_token_hold_retransmit_timeout (instance);
4109  }
4110 
4111  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4112  }
4113  break;
4114  }
4115 
4116  if ((forward_token)
4117  && instance->use_heartbeat) {
4118  reset_heartbeat_timeout(instance);
4119  }
4120  else {
4121  cancel_heartbeat_timeout(instance);
4122  }
4123 
4124  return (0);
4125 }
4126 
4127 static void messages_deliver_to_app (
4128  struct totemsrp_instance *instance,
4129  int skip,
4130  unsigned int end_point)
4131 {
4132  struct sort_queue_item *sort_queue_item_p;
4133  unsigned int i;
4134  int res;
4135  struct mcast *mcast_in;
4136  struct mcast mcast_header;
4137  unsigned int range = 0;
4138  int endian_conversion_required;
4139  unsigned int my_high_delivered_stored = 0;
4140  struct srp_addr aligned_system_from;
4141 
4142  range = end_point - instance->my_high_delivered;
4143 
4144  if (range) {
4146  "Delivering %x to %x", instance->my_high_delivered,
4147  end_point);
4148  }
4149  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4150  my_high_delivered_stored = instance->my_high_delivered;
4151 
4152  /*
4153  * Deliver messages in order from rtr queue to pending delivery queue
4154  */
4155  for (i = 1; i <= range; i++) {
4156 
4157  void *ptr = 0;
4158 
4159  /*
4160  * If out of range of sort queue, stop assembly
4161  */
4162  res = sq_in_range (&instance->regular_sort_queue,
4163  my_high_delivered_stored + i);
4164  if (res == 0) {
4165  break;
4166  }
4167 
4168  res = sq_item_get (&instance->regular_sort_queue,
4169  my_high_delivered_stored + i, &ptr);
4170  /*
4171  * If hole, stop assembly
4172  */
4173  if (res != 0 && skip == 0) {
4174  break;
4175  }
4176 
4177  instance->my_high_delivered = my_high_delivered_stored + i;
4178 
4179  if (res != 0) {
4180  continue;
4181 
4182  }
4183 
4184  sort_queue_item_p = ptr;
4185 
4186  mcast_in = sort_queue_item_p->mcast;
4187  assert (mcast_in != (struct mcast *)0xdeadbeef);
4188 
4189  endian_conversion_required = 0;
4190  if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4191  endian_conversion_required = 1;
4192  mcast_endian_convert (mcast_in, &mcast_header);
4193  } else {
4194  memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4195  }
4196 
4197  aligned_system_from = mcast_header.system_from;
4198 
4199  /*
4200  * Skip messages not originated in instance->my_deliver_memb
4201  */
4202  if (skip &&
4203  memb_set_subset (&aligned_system_from,
4204  1,
4205  instance->my_deliver_memb_list,
4206  instance->my_deliver_memb_entries) == 0) {
4207 
4208  instance->my_high_delivered = my_high_delivered_stored + i;
4209 
4210  continue;
4211  }
4212 
4213  /*
4214  * Message found
4215  */
4217  "Delivering MCAST message with seq %x to pending delivery queue",
4218  mcast_header.seq);
4219 
4220  /*
4221  * Message is locally originated multicast
4222  */
4223  instance->totemsrp_deliver_fn (
4224  mcast_header.header.nodeid,
4225  ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4226  sort_queue_item_p->msg_len - sizeof (struct mcast),
4227  endian_conversion_required);
4228  }
4229 }
4230 
4231 /*
4232  * recv message handler called when MCAST message type received
4233  */
4234 static int message_handler_mcast (
4235  struct totemsrp_instance *instance,
4236  const void *msg,
4237  size_t msg_len,
4238  int endian_conversion_needed)
4239 {
4241  struct sq *sort_queue;
4242  struct mcast mcast_header;
4243  struct srp_addr aligned_system_from;
4244 
4245  if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4246  return (0);
4247  }
4248 
4249  if (endian_conversion_needed) {
4250  mcast_endian_convert (msg, &mcast_header);
4251  } else {
4252  memcpy (&mcast_header, msg, sizeof (struct mcast));
4253  }
4254 
4255  if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4256  sort_queue = &instance->recovery_sort_queue;
4257  } else {
4258  sort_queue = &instance->regular_sort_queue;
4259  }
4260 
4261  assert (msg_len <= FRAME_SIZE_MAX);
4262 
4263 #ifdef TEST_DROP_MCAST_PERCENTAGE
4264  if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4265  return (0);
4266  }
4267 #endif
4268 
4269  /*
4270  * If the message is foreign execute the switch below
4271  */
4272  if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4273  sizeof (struct memb_ring_id)) != 0) {
4274 
4275  aligned_system_from = mcast_header.system_from;
4276 
4277  switch (instance->memb_state) {
4279  memb_set_merge (
4280  &aligned_system_from, 1,
4281  instance->my_proc_list, &instance->my_proc_list_entries);
4282  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4283  break;
4284 
4285  case MEMB_STATE_GATHER:
4286  if (!memb_set_subset (
4287  &aligned_system_from,
4288  1,
4289  instance->my_proc_list,
4290  instance->my_proc_list_entries)) {
4291 
4292  memb_set_merge (&aligned_system_from, 1,
4293  instance->my_proc_list, &instance->my_proc_list_entries);
4294  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4295  return (0);
4296  }
4297  break;
4298 
4299  case MEMB_STATE_COMMIT:
4300  /* discard message */
4301  instance->stats.rx_msg_dropped++;
4302  break;
4303 
4304  case MEMB_STATE_RECOVERY:
4305  /* discard message */
4306  instance->stats.rx_msg_dropped++;
4307  break;
4308  }
4309  return (0);
4310  }
4311 
4313  "Received ringid (" CS_PRI_RING_ID ") seq %x",
4314  mcast_header.ring_id.rep,
4315  (uint64_t)mcast_header.ring_id.seq,
4316  mcast_header.seq);
4317 
4318  /*
4319  * Add mcast message to rtr queue if not already in rtr queue
4320  * otherwise free io vectors
4321  */
4322  if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4323  sq_in_range (sort_queue, mcast_header.seq) &&
4324  sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4325 
4326  /*
4327  * Allocate new multicast memory block
4328  */
4329 // TODO LEAK
4330  sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4331  if (sort_queue_item.mcast == NULL) {
4332  return (-1); /* error here is corrected by the algorithm */
4333  }
4334  memcpy (sort_queue_item.mcast, msg, msg_len);
4335  sort_queue_item.msg_len = msg_len;
4336 
4337  if (sq_lt_compare (instance->my_high_seq_received,
4338  mcast_header.seq)) {
4339  instance->my_high_seq_received = mcast_header.seq;
4340  }
4341 
4342  sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4343  }
4344 
4345  update_aru (instance);
4346  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4347  messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4348  }
4349 
4350 /* TODO remove from retrans message queue for old ring in recovery state */
4351  return (0);
4352 }
4353 
4354 static int message_handler_memb_merge_detect (
4355  struct totemsrp_instance *instance,
4356  const void *msg,
4357  size_t msg_len,
4358  int endian_conversion_needed)
4359 {
4361  struct srp_addr aligned_system_from;
4362 
4363  if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4364  return (0);
4365  }
4366 
4367  if (endian_conversion_needed) {
4368  memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4369  } else {
4370  memcpy (&memb_merge_detect, msg,
4371  sizeof (struct memb_merge_detect));
4372  }
4373 
4374  /*
4375  * do nothing if this is a merge detect from this configuration
4376  */
4377  if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4378  sizeof (struct memb_ring_id)) == 0) {
4379 
4380  return (0);
4381  }
4382 
4383  aligned_system_from = memb_merge_detect.system_from;
4384 
4385  /*
4386  * Execute merge operation
4387  */
4388  switch (instance->memb_state) {
4390  memb_set_merge (&aligned_system_from, 1,
4391  instance->my_proc_list, &instance->my_proc_list_entries);
4392  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4393  break;
4394 
4395  case MEMB_STATE_GATHER:
4396  if (!memb_set_subset (
4397  &aligned_system_from,
4398  1,
4399  instance->my_proc_list,
4400  instance->my_proc_list_entries)) {
4401 
4402  memb_set_merge (&aligned_system_from, 1,
4403  instance->my_proc_list, &instance->my_proc_list_entries);
4404  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4405  return (0);
4406  }
4407  break;
4408 
4409  case MEMB_STATE_COMMIT:
4410  /* do nothing in commit */
4411  break;
4412 
4413  case MEMB_STATE_RECOVERY:
4414  /* do nothing in recovery */
4415  break;
4416  }
4417  return (0);
4418 }
4419 
4420 static void memb_join_process (
4421  struct totemsrp_instance *instance,
4422  const struct memb_join *memb_join)
4423 {
4424  struct srp_addr *proc_list;
4425  struct srp_addr *failed_list;
4426  int gather_entered = 0;
4427  int fail_minus_memb_entries = 0;
4428  struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4429  struct srp_addr aligned_system_from;
4430 
4431  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4432  failed_list = proc_list + memb_join->proc_list_entries;
4433  aligned_system_from = memb_join->system_from;
4434 
4435  log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4436  memb_set_log(instance, instance->totemsrp_log_level_trace,
4437  "proclist", proc_list, memb_join->proc_list_entries);
4438  memb_set_log(instance, instance->totemsrp_log_level_trace,
4439  "faillist", failed_list, memb_join->failed_list_entries);
4440  memb_set_log(instance, instance->totemsrp_log_level_trace,
4441  "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4442  memb_set_log(instance, instance->totemsrp_log_level_trace,
4443  "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4444 
4446  if (instance->flushing) {
4449  "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4451  if (memb_join->failed_list_entries > 0) {
4452  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4453  }
4454  } else {
4456  "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4457  }
4458  return;
4459  } else {
4462  "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4463  if (memb_join->failed_list_entries > 0) {
4464  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4465  }
4466  }
4467  }
4468 
4469  }
4470 
4471  if (memb_set_equal (proc_list,
4473  instance->my_proc_list,
4474  instance->my_proc_list_entries) &&
4475 
4476  memb_set_equal (failed_list,
4478  instance->my_failed_list,
4479  instance->my_failed_list_entries)) {
4480 
4482  memb_consensus_set (instance, &aligned_system_from);
4483  }
4484 
4485  if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4486  instance->failed_to_recv = 0;
4487  instance->my_proc_list[0] = instance->my_id;
4488  instance->my_proc_list_entries = 1;
4489  instance->my_failed_list_entries = 0;
4490 
4491  memb_state_commit_token_create (instance);
4492 
4493  memb_state_commit_enter (instance);
4494  return;
4495  }
4496  if (memb_consensus_agreed (instance) &&
4497  memb_lowest_in_config (instance)) {
4498 
4499  memb_state_commit_token_create (instance);
4500 
4501  memb_state_commit_enter (instance);
4502  } else {
4503  goto out;
4504  }
4505  } else
4506  if (memb_set_subset (proc_list,
4508  instance->my_proc_list,
4509  instance->my_proc_list_entries) &&
4510 
4511  memb_set_subset (failed_list,
4513  instance->my_failed_list,
4514  instance->my_failed_list_entries)) {
4515 
4516  goto out;
4517  } else
4518  if (memb_set_subset (&aligned_system_from, 1,
4519  instance->my_failed_list, instance->my_failed_list_entries)) {
4520 
4521  goto out;
4522  } else {
4523  memb_set_merge (proc_list,
4525  instance->my_proc_list, &instance->my_proc_list_entries);
4526 
4527  if (memb_set_subset (
4528  &instance->my_id, 1,
4529  failed_list, memb_join->failed_list_entries)) {
4530 
4531  memb_set_merge (
4532  &aligned_system_from, 1,
4533  instance->my_failed_list, &instance->my_failed_list_entries);
4534  } else {
4535  if (memb_set_subset (
4536  &aligned_system_from, 1,
4537  instance->my_memb_list,
4538  instance->my_memb_entries)) {
4539 
4540  if (memb_set_subset (
4541  &aligned_system_from, 1,
4542  instance->my_failed_list,
4543  instance->my_failed_list_entries) == 0) {
4544 
4545  memb_set_merge (failed_list,
4547  instance->my_failed_list, &instance->my_failed_list_entries);
4548  } else {
4549  memb_set_subtract (fail_minus_memb,
4550  &fail_minus_memb_entries,
4551  failed_list,
4553  instance->my_memb_list,
4554  instance->my_memb_entries);
4555 
4556  memb_set_merge (fail_minus_memb,
4557  fail_minus_memb_entries,
4558  instance->my_failed_list,
4559  &instance->my_failed_list_entries);
4560  }
4561  }
4562  }
4563  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4564  gather_entered = 1;
4565  }
4566 
4567 out:
4568  if (gather_entered == 0 &&
4569  instance->memb_state == MEMB_STATE_OPERATIONAL) {
4570 
4571  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4572  }
4573 }
4574 
4575 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4576 {
4577  int i;
4578  struct srp_addr *in_proc_list;
4579  struct srp_addr *in_failed_list;
4580  struct srp_addr *out_proc_list;
4581  struct srp_addr *out_failed_list;
4582 
4583  out->header.magic = TOTEM_MH_MAGIC;
4585  out->header.type = in->header.type;
4586  out->header.nodeid = swab32 (in->header.nodeid);
4587  out->system_from = srp_addr_endian_convert(in->system_from);
4590  out->ring_seq = swab64 (in->ring_seq);
4591 
4592  in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4593  in_failed_list = in_proc_list + out->proc_list_entries;
4594  out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4595  out_failed_list = out_proc_list + out->proc_list_entries;
4596 
4597  for (i = 0; i < out->proc_list_entries; i++) {
4598  out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4599  }
4600  for (i = 0; i < out->failed_list_entries; i++) {
4601  out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4602  }
4603 }
4604 
4605 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4606 {
4607  int i;
4608  struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4609  struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4610  struct memb_commit_token_memb_entry *in_memb_list;
4611  struct memb_commit_token_memb_entry *out_memb_list;
4612 
4613  out->header.magic = TOTEM_MH_MAGIC;
4615  out->header.type = in->header.type;
4616  out->header.nodeid = swab32 (in->header.nodeid);
4617  out->token_seq = swab32 (in->token_seq);
4618  out->ring_id.rep = swab32(in->ring_id.rep);
4619  out->ring_id.seq = swab64 (in->ring_id.seq);
4620  out->retrans_flg = swab32 (in->retrans_flg);
4621  out->memb_index = swab32 (in->memb_index);
4622  out->addr_entries = swab32 (in->addr_entries);
4623 
4624  in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4625  out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4626  for (i = 0; i < out->addr_entries; i++) {
4627  out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4628 
4629  /*
4630  * Only convert the memb entry if it has been set
4631  */
4632  if (in_memb_list[i].ring_id.rep != 0) {
4633  out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4634 
4635  out_memb_list[i].ring_id.seq =
4636  swab64 (in_memb_list[i].ring_id.seq);
4637  out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4638  out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4639  out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4640  }
4641  }
4642 }
4643 
4644 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4645 {
4646  int i;
4647 
4648  out->header.magic = TOTEM_MH_MAGIC;
4650  out->header.type = in->header.type;
4651  out->header.nodeid = swab32 (in->header.nodeid);
4652  out->seq = swab32 (in->seq);
4653  out->token_seq = swab32 (in->token_seq);
4654  out->aru = swab32 (in->aru);
4655  out->ring_id.rep = swab32(in->ring_id.rep);
4656  out->aru_addr = swab32(in->aru_addr);
4657  out->ring_id.seq = swab64 (in->ring_id.seq);
4658  out->fcc = swab32 (in->fcc);
4659  out->backlog = swab32 (in->backlog);
4660  out->retrans_flg = swab32 (in->retrans_flg);
4662  for (i = 0; i < out->rtr_list_entries; i++) {
4663  out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4664  out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4665  out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4666  }
4667 }
4668 
4669 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4670 {
4671  out->header.magic = TOTEM_MH_MAGIC;
4673  out->header.type = in->header.type;
4674  out->header.nodeid = swab32 (in->header.nodeid);
4676 
4677  out->seq = swab32 (in->seq);
4678  out->this_seqno = swab32 (in->this_seqno);
4679  out->ring_id.rep = swab32(in->ring_id.rep);
4680  out->ring_id.seq = swab64 (in->ring_id.seq);
4681  out->node_id = swab32 (in->node_id);
4682  out->guarantee = swab32 (in->guarantee);
4683  out->system_from = srp_addr_endian_convert(in->system_from);
4684 }
4685 
4686 static void memb_merge_detect_endian_convert (
4687  const struct memb_merge_detect *in,
4688  struct memb_merge_detect *out)
4689 {
4690  out->header.magic = TOTEM_MH_MAGIC;
4692  out->header.type = in->header.type;
4693  out->header.nodeid = swab32 (in->header.nodeid);
4694  out->ring_id.rep = swab32(in->ring_id.rep);
4695  out->ring_id.seq = swab64 (in->ring_id.seq);
4696  out->system_from = srp_addr_endian_convert (in->system_from);
4697 }
4698 
4699 static int ignore_join_under_operational (
4700  struct totemsrp_instance *instance,
4701  const struct memb_join *memb_join)
4702 {
4703  struct srp_addr *proc_list;
4704  struct srp_addr *failed_list;
4705  unsigned long long ring_seq;
4706  struct srp_addr aligned_system_from;
4707 
4708  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4709  failed_list = proc_list + memb_join->proc_list_entries;
4711  aligned_system_from = memb_join->system_from;
4712 
4713  if (memb_set_subset (&instance->my_id, 1,
4714  failed_list, memb_join->failed_list_entries)) {
4715  return (1);
4716  }
4717 
4718  /*
4719  * In operational state, my_proc_list is exactly the same as
4720  * my_memb_list.
4721  */
4722  if ((memb_set_subset (&aligned_system_from, 1,
4723  instance->my_memb_list, instance->my_memb_entries)) &&
4724  (ring_seq < instance->my_ring_id.seq)) {
4725  return (1);
4726  }
4727 
4728  return (0);
4729 }
4730 
4731 static int message_handler_memb_join (
4732  struct totemsrp_instance *instance,
4733  const void *msg,
4734  size_t msg_len,
4735  int endian_conversion_needed)
4736 {
4737  const struct memb_join *memb_join;
4738  struct memb_join *memb_join_convert = alloca (msg_len);
4739  struct srp_addr aligned_system_from;
4740 
4741  if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4742  return (0);
4743  }
4744 
4745  if (endian_conversion_needed) {
4746  memb_join = memb_join_convert;
4747  memb_join_endian_convert (msg, memb_join_convert);
4748 
4749  } else {
4750  memb_join = msg;
4751  }
4752 
4753  aligned_system_from = memb_join->system_from;
4754 
4755  /*
4756  * If the process paused because it wasn't scheduled in a timely
4757  * fashion, flush the join messages because they may be queued
4758  * entries
4759  */
4760  if (pause_flush (instance)) {
4761  return (0);
4762  }
4763 
4764  if (instance->token_ring_id_seq < memb_join->ring_seq) {
4765  instance->token_ring_id_seq = memb_join->ring_seq;
4766  }
4767  switch (instance->memb_state) {
4769  if (!ignore_join_under_operational (instance, memb_join)) {
4770  memb_join_process (instance, memb_join);
4771  }
4772  break;
4773 
4774  case MEMB_STATE_GATHER:
4775  memb_join_process (instance, memb_join);
4776  break;
4777 
4778  case MEMB_STATE_COMMIT:
4779  if (memb_set_subset (&aligned_system_from,
4780  1,
4781  instance->my_new_memb_list,
4782  instance->my_new_memb_entries) &&
4783 
4784  memb_join->ring_seq >= instance->my_ring_id.seq) {
4785 
4786  memb_join_process (instance, memb_join);
4787  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4788  }
4789  break;
4790 
4791  case MEMB_STATE_RECOVERY:
4792  if (memb_set_subset (&aligned_system_from,
4793  1,
4794  instance->my_new_memb_list,
4795  instance->my_new_memb_entries) &&
4796 
4797  memb_join->ring_seq >= instance->my_ring_id.seq) {
4798 
4799  memb_join_process (instance, memb_join);
4800  memb_recovery_state_token_loss (instance);
4801  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4802  }
4803  break;
4804  }
4805  return (0);
4806 }
4807 
4808 static int message_handler_memb_commit_token (
4809  struct totemsrp_instance *instance,
4810  const void *msg,
4811  size_t msg_len,
4812  int endian_conversion_needed)
4813 {
4814  struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4816  struct srp_addr sub[PROCESSOR_COUNT_MAX];
4817  int sub_entries;
4818 
4819  struct srp_addr *addr;
4820 
4822  "got commit token");
4823 
4824  if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4825  return (0);
4826  }
4827 
4828  if (endian_conversion_needed) {
4829  memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4830  } else {
4831  memcpy (memb_commit_token_convert, msg, msg_len);
4832  }
4833  memb_commit_token = memb_commit_token_convert;
4835 
4836 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4837  if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4838  return (0);
4839  }
4840 #endif
4841  switch (instance->memb_state) {
4843  /* discard token */
4844  break;
4845 
4846  case MEMB_STATE_GATHER:
4847  memb_set_subtract (sub, &sub_entries,
4848  instance->my_proc_list, instance->my_proc_list_entries,
4849  instance->my_failed_list, instance->my_failed_list_entries);
4850 
4851  if (memb_set_equal (addr,
4853  sub,
4854  sub_entries) &&
4855 
4856  memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4857  memcpy (instance->commit_token, memb_commit_token, msg_len);
4858  memb_state_commit_enter (instance);
4859  }
4860  break;
4861 
4862  case MEMB_STATE_COMMIT:
4863  /*
4864  * If retransmitted commit tokens are sent on this ring
4865  * filter them out and only enter recovery once the
4866  * commit token has traversed the array. This is
4867  * determined by :
4868  * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4869  */
4870  if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4872  memb_state_recovery_enter (instance, memb_commit_token);
4873  }
4874  break;
4875 
4876  case MEMB_STATE_RECOVERY:
4877  if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4878 
4879  /* Filter out duplicated tokens */
4880  if (instance->originated_orf_token) {
4881  break;
4882  }
4883 
4884  instance->originated_orf_token = 1;
4885 
4887  "Sending initial ORF token");
4888 
4889  // TODO convert instead of initiate
4890  orf_token_send_initial (instance);
4891  reset_token_timeout (instance); // REVIEWED
4892  reset_token_retransmit_timeout (instance); // REVIEWED
4893  }
4894  break;
4895  }
4896  return (0);
4897 }
4898 
4899 static int message_handler_token_hold_cancel (
4900  struct totemsrp_instance *instance,
4901  const void *msg,
4902  size_t msg_len,
4903  int endian_conversion_needed)
4904 {
4905  const struct token_hold_cancel *token_hold_cancel = msg;
4906 
4907  if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4908  return (0);
4909  }
4910 
4911  if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4912  sizeof (struct memb_ring_id)) == 0) {
4913 
4914  instance->my_seq_unchanged = 0;
4915  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4916  timer_function_token_retransmit_timeout (instance);
4917  }
4918  }
4919  return (0);
4920 }
4921 
4922 static int check_message_header_validity(
4923  void *context,
4924  const void *msg,
4925  unsigned int msg_len,
4926  const struct sockaddr_storage *system_from)
4927 {
4928  struct totemsrp_instance *instance = context;
4929  const struct totem_message_header *message_header = msg;
4930  const char *guessed_str;
4931  const char *msg_byte = msg;
4932 
4933  if (msg_len < sizeof (struct totem_message_header)) {
4935  "Message received from %s is too short... Ignoring %u.",
4936  totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4937  return (-1);
4938  }
4939 
4940  if (message_header->magic != TOTEM_MH_MAGIC &&
4941  message_header->magic != swab16(TOTEM_MH_MAGIC)) {
4942  /*
4943  * We've received ether Knet, old version of Corosync,
4944  * or something else. Do some guessing to display (hopefully)
4945  * helpful message
4946  */
4947  guessed_str = NULL;
4948 
4949  if (message_header->magic == 0xFFFF) {
4950  /*
4951  * Corosync 2.2 used header with two UINT8_MAX
4952  */
4953  guessed_str = "Corosync 2.2";
4954  } else if (message_header->magic == 0xFEFE) {
4955  /*
4956  * Corosync 2.3+ used header with two UINT8_MAX - 1
4957  */
4958  guessed_str = "Corosync 2.3+";
4959  } else if (msg_byte[0] == 0x01) {
4960  /*
4961  * Knet has stable1 with first byte of message == 1
4962  */
4963  guessed_str = "unencrypted Kronosnet";
4964  } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4965  /*
4966  * Unencrypted Corosync 1.x/OpenAIS has first byte
4967  * 0-5. Collision with Knet (but still worth the try)
4968  */
4969  guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
4970  } else {
4971  /*
4972  * Encrypted Kronosned packet has a hash at the end of
4973  * the packet and nothing specific at the beginning of the
4974  * packet (just encrypted data).
4975  * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
4976  * is in the beginning of the packet.
4977  *
4978  * So it's not possible to reliably detect ether of them.
4979  */
4980  guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
4981  }
4982 
4984  "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
4985  totemip_sa_print((struct sockaddr *)system_from),
4986  guessed_str);
4987 
4988  return (-1);
4989  }
4990 
4991  if (message_header->version != TOTEM_MH_VERSION) {
4993  "Message received from %s has unsupported version %u... Ignoring",
4994  totemip_sa_print((struct sockaddr *)system_from),
4995  message_header->version);
4996 
4997  return (-1);
4998  }
4999 
5000  return (0);
5001 }
5002 
5003 
5005  void *context,
5006  const void *msg,
5007  unsigned int msg_len,
5008  const struct sockaddr_storage *system_from)
5009 {
5010  struct totemsrp_instance *instance = context;
5011  const struct totem_message_header *message_header = msg;
5012 
5013  if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5014  return ;
5015  }
5016 
5017  switch (message_header->type) {
5019  instance->stats.orf_token_rx++;
5020  break;
5021  case MESSAGE_TYPE_MCAST:
5022  instance->stats.mcast_rx++;
5023  break;
5025  instance->stats.memb_merge_detect_rx++;
5026  break;
5028  instance->stats.memb_join_rx++;
5029  break;
5031  instance->stats.memb_commit_token_rx++;
5032  break;
5034  instance->stats.token_hold_cancel_rx++;
5035  break;
5036  default:
5038  "Message received from %s has wrong type... ignoring %d.\n",
5039  totemip_sa_print((struct sockaddr *)system_from),
5040  (int)message_header->type);
5041 
5042  instance->stats.rx_msg_dropped++;
5043  return;
5044  }
5045  /*
5046  * Handle incoming message
5047  */
5048  totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5049  instance,
5050  msg,
5051  msg_len,
5052  message_header->magic != TOTEM_MH_MAGIC);
5053 }
5054 
5056  void *context,
5057  const struct totem_ip_address *interface_addr,
5058  unsigned short ip_port,
5059  unsigned int iface_no)
5060 {
5061  struct totemsrp_instance *instance = context;
5062  int res;
5063 
5064  totemip_copy(&instance->my_addrs[iface_no], interface_addr);
5065 
5066  res = totemnet_iface_set (
5067  instance->totemnet_context,
5068  interface_addr,
5069  ip_port,
5070  iface_no);
5071 
5072  return (res);
5073 }
5074 
5075 
5077  void *context,
5078  const struct totem_ip_address *iface_addr,
5079  unsigned int iface_no)
5080 {
5081  struct totemsrp_instance *instance = context;
5082  int num_interfaces;
5083  int i;
5084 
5085  if (!instance->my_id.nodeid) {
5086  instance->my_id.nodeid = iface_addr->nodeid;
5087  }
5088  totemip_copy (&instance->my_addrs[iface_no], iface_addr);
5089 
5090  if (instance->iface_changes++ == 0) {
5091  instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5092  /*
5093  * Increase the ring_id sequence number. This doesn't follow specification.
5094  * Solves problem with restarted leader node (node with lowest nodeid) before
5095  * rest of the cluster forms new membership and guarantees unique ring_id for
5096  * new singleton configuration.
5097  */
5098  instance->my_ring_id.seq++;
5099 
5100  instance->token_ring_id_seq = instance->my_ring_id.seq;
5101  log_printf (
5102  instance->totemsrp_log_level_debug,
5103  "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5104  instance->my_ring_id.rep,
5105  (uint64_t)instance->my_ring_id.seq);
5106 
5107  if (instance->totemsrp_service_ready_fn) {
5108  instance->totemsrp_service_ready_fn ();
5109  }
5110 
5111  }
5112 
5113  for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
5114  totemsrp_member_add (instance,
5115  &instance->totem_config->interfaces[iface_no].member_list[i],
5116  iface_no);
5117  }
5118 
5119  num_interfaces = 0;
5120  for (i = 0; i < INTERFACE_MAX; i++) {
5121  if (instance->totem_config->interfaces[i].configured) {
5122  num_interfaces++;
5123  }
5124  }
5125 
5126  if (instance->iface_changes >= num_interfaces) {
5127  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5128  }
5129 }
5130 
5132  totem_config->net_mtu -= 2 * sizeof (struct mcast);
5133 }
5134 
5136  void *context,
5137  void (*totem_service_ready) (void))
5138 {
5139  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5140 
5141  instance->totemsrp_service_ready_fn = totem_service_ready;
5142 }
5143 
5145  void *context,
5146  const struct totem_ip_address *member,
5147  int iface_no)
5148 {
5149  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5150  int res;
5151 
5152  res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5153 
5154  return (res);
5155 }
5156 
5158  void *context,
5159  const struct totem_ip_address *member,
5160  int iface_no)
5161 {
5162  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5163  int res;
5164 
5165  res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5166 
5167  return (res);
5168 }
5169 
5170 void totemsrp_threaded_mode_enable (void *context)
5171 {
5172  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5173 
5174  instance->threaded_mode_enabled = 1;
5175 }
5176 
5177 void totemsrp_trans_ack (void *context)
5178 {
5179  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5180 
5181  instance->waiting_trans_ack = 0;
5182  instance->totemsrp_waiting_trans_ack_cb_fn (0);
5183 }
5184 
5185 
5186 int totemsrp_reconfigure (void *context, struct totem_config *totem_config)
5187 {
5188  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5189  int res;
5190 
5192  return (res);
5193 }
5194 
5195 void totemsrp_stats_clear (void *context, int flags)
5196 {
5197  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5198 
5199  memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5202  }
5203 }
5204 
5205 void totemsrp_force_gather (void *context)
5206 {
5207  timer_function_orf_token_timeout(context);
5208 }
totem_config::token_warning
unsigned int token_warning
Definition: totem.h:176
totemsrp_stats_t::rx_msg_dropped
uint64_t rx_msg_dropped
Definition: totemstats.h:77
swab32
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
Definition: totemsrp.c:545
totem_config::max_messages
unsigned int max_messages
Definition: totem.h:212
memb_join::failed_list_entries
unsigned int failed_list_entries
Definition: totemsrp.c:216
TOTEM_CONFIGURATION_REGULAR
@ TOTEM_CONFIGURATION_REGULAR
Definition: coroapi.h:133
orf_token::seq
unsigned int seq
Definition: totemsrp.c:199
token_callback_instance
Definition: totemsrp.c:166
totemsrp_instance::my_proc_list_entries
int my_proc_list_entries
Definition: totemsrp.c:321
totemsrp_instance::token_sent_event_handle
void * token_sent_event_handle
Definition: totemsrp.c:526
message_handlers
Definition: totemsrp.c:530
TOTEM_CALLBACK_TOKEN_SENT
@ TOTEM_CALLBACK_TOKEN_SENT
Definition: coroapi.h:144
totem_config::heartbeat_failures_allowed
unsigned int heartbeat_failures_allowed
Definition: totem.h:206
totem_logging_configuration::log_level_error
int log_level_error
Definition: totem.h:109
totemsrp_instance::totemsrp_log_level_debug
int totemsrp_log_level_debug
Definition: totemsrp.c:429
rtr_item::seq
unsigned int seq
Definition: totemsrp.c:193
totemnet_mcast_flush_send
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:410
totemsrp_instance::memb_ring_id_create_or_load
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:469
totem_callback_token_type
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
totemsrp_crypto_set
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1085
orf_token::fcc
unsigned int fcc
Definition: totemsrp.c:205
totem_message_header::type
char type
Definition: totem.h:127
totemsrp_instance::my_high_delivered
unsigned int my_high_delivered
Definition: totemsrp.c:383
main_deliver_fn
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition: totemsrp.c:5004
totemsrp_instance::my_leave_memb_entries
int my_leave_memb_entries
Definition: totemsrp.c:335
RETRANS_MESSAGE_QUEUE_SIZE_MAX
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:94
value
uint32_t value
Definition: exec/votequorum.c:101
memb_commit_token::retrans_flg
unsigned int retrans_flg
Definition: totemsrp.c:252
totem_config::token_retransmits_before_loss_const
unsigned int token_retransmits_before_loss_const
Definition: totem.h:182
totemsrp_stats_t::token_hold_cancel_tx
uint64_t token_hold_cancel_tx
Definition: totemstats.h:66
orf_token::retrans_flg
int retrans_flg
Definition: totemsrp.c:206
totem_configuration_type
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
token_callback_instance::list
struct qb_list_head list
Definition: totemsrp.c:167
totemsrp_stats_t::memb_commit_token_tx
uint64_t memb_commit_token_tx
Definition: totemstats.h:64
totem_config::join_timeout
unsigned int join_timeout
Definition: totem.h:184
totem_logging_configuration::log_level_trace
int log_level_trace
Definition: totem.h:113
totem_config::downcheck_timeout
unsigned int downcheck_timeout
Definition: totem.h:192
TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
Definition: totemsrp.c:549
TOKEN_SIZE_MAX
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:98
totemnet_send_flush
int totemnet_send_flush(void *net_context)
Definition: totemnet.c:388
totemsrp_instance::timer_pause_timeout
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:398
gather_state_from_desc
const char * gather_state_from_desc[]
Definition: totemsrp.c:559
totemsrp_stats_t::commit_token_lost
uint64_t commit_token_lost
Definition: totemstats.h:73
consensus_list_item::addr
struct srp_addr addr
Definition: totemsrp.c:161
memb_commit_token_memb_entry::aru
unsigned int aru
Definition: totemsrp.c:242
orf_token::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:203
__attribute__
struct message_item __attribute__
totemsrp_instance::totemsrp_log_level_security
int totemsrp_log_level_security
Definition: totemsrp.c:421
totemsrp_instance::last_released
unsigned int last_released
Definition: totemsrp.c:483
TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
Definition: totemsrp.c:554
totem_interface::member_count
int member_count
Definition: totem.h:88
mcast::node_id
unsigned int node_id
Definition: totemsrp.c:186
message_handlers::handler_functions
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:532
TOTEM_TOKEN_STATS_MAX
#define TOTEM_TOKEN_STATS_MAX
Definition: totemstats.h:89
TOTEMSRP_GSFROM_MERGE_DURING_JOIN
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
Definition: totemsrp.c:551
totem_interface::member_list
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:95
totemsrp_instance::old_ring_state_saved
int old_ring_state_saved
Definition: totemsrp.c:487
totemsrp_instance::totemsrp_confchg_fn
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:457
MEMB_STATE_GATHER
@ MEMB_STATE_GATHER
Definition: totemsrp.c:276
totemsrp_stats_t::latest_token
int latest_token
Definition: totemstats.h:88
cs_queue
Definition: cs_queue.h:44
totemsrp_finalize
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1024
MESSAGE_TYPE_ORF_TOKEN
@ MESSAGE_TYPE_ORF_TOKEN
Definition: totemsrp.c:144
totem_config::totem_memb_ring_id_store
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:242
mcast::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:185
totem_message_header::nodeid
unsigned int nodeid
Definition: totem.h:129
totemsrp_instance::my_last_aru
unsigned int my_last_aru
Definition: totemsrp.c:345
token_callback_instance::delete
int delete
Definition: totemsrp.c:170
memb_ring_id::rep
unsigned int rep
Definition: totem.h:148
TOTEMSRP_GSFROM_MAX
@ TOTEMSRP_GSFROM_MAX
Definition: totemsrp.c:556
totem_config::fail_to_recv_const
unsigned int fail_to_recv_const
Definition: totem.h:194
LOGSYS_LEVEL_DEBUG
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:76
totem_logging_configuration::log_level_debug
int log_level_debug
Definition: totem.h:112
addr_entries
int addr_entries
Definition: totemsrp.c:7
totemsrp_stats_t::memb_join_rx
uint64_t memb_join_rx
Definition: totemstats.h:60
totemsrp_stats_t
Definition: totemstats.h:53
backlog
unsigned int backlog
Definition: totemsrp.c:8
totemsrp_instance::my_aru
unsigned int my_aru
Definition: totemsrp.c:381
totemsrp_instance::commit_token
struct memb_commit_token * commit_token
Definition: totemsrp.c:511
type
char type
Definition: totem.h:4
TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
Definition: totemsrp.c:547
log_printf
#define log_printf(level, format, args...)
Definition: totemsrp.c:687
totemsrp.h
QUEUE_RTR_ITEMS_SIZE_MAX
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:93
MESSAGE_TYPE_MEMB_MERGE_DETECT
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
Definition: totemsrp.c:146
MESSAGE_NOT_ENCAPSULATED
@ MESSAGE_NOT_ENCAPSULATED
Definition: totemsrp.c:154
totem_logging_configuration::log_level_notice
int log_level_notice
Definition: totem.h:111
totemnet_buffer_alloc
void * totemnet_buffer_alloc(void *net_context)
Definition: totemnet.c:351
totemsrp_instance::fcc_remcast_current
int fcc_remcast_current
Definition: totemsrp.c:293
totemsrp_stats_t::earliest_token
int earliest_token
Definition: totemstats.h:87
totemsrp_token_stats_t::rx
uint32_t rx
Definition: totemstats.h:48
totemsrp_instance::my_token_held
int my_token_held
Definition: totemsrp.c:479
totemsrp_service_ready_register
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:5135
totemsrp_instance::my_cbl
unsigned int my_cbl
Definition: totemsrp.c:507
totemsrp_callback_token_create
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3454
mcast
Definition: totemsrp.c:180
totemsrp_token_stats_t::tx
uint32_t tx
Definition: totemstats.h:49
TOTEMSRP_GSFROM_GATHER_MISSING1
@ TOTEMSRP_GSFROM_GATHER_MISSING1
Definition: totemsrp.c:541
high_delivered
unsigned int high_delivered
Definition: totemsrp.c:4
totemsrp_instance::commit_token_storage
char commit_token_storage[40000]
Definition: totemsrp.c:527
MEMB_STATE_OPERATIONAL
@ MEMB_STATE_OPERATIONAL
Definition: totemsrp.c:275
memb_join::proc_list_entries
unsigned int proc_list_entries
Definition: totemsrp.c:215
totemsrp_my_family_get
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1110
totemsrp_instance::fcc_mcast_last
int fcc_mcast_last
Definition: totemsrp.c:291
totem_message_header::encapsulated
char encapsulated
Definition: totem.h:128
memb_join::end_of_memb_join
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:218
TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
Definition: totemsrp.c:550
totemsrp_instance::timer_orf_token_retransmit_timeout
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:404
TOTEM_CALLBACK_TOKEN_RECEIVED
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition: coroapi.h:143
totemsrp_reconfigure
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition: totemsrp.c:5186
totem_ip_address::nodeid
unsigned int nodeid
Definition: coroapi.h:112
totemnet_buffer_release
void totemnet_buffer_release(void *net_context, void *ptr)
Definition: totemnet.c:359
totemsrp_socket
Definition: totemsrp.c:175
totemsrp_stats_t::mcast_tx
uint64_t mcast_tx
Definition: totemstats.h:61
totemsrp_net_mtu_adjust
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5131
totempg_stats_t
Definition: totemstats.h:94
proc_list_entries
unsigned int proc_list_entries
Definition: totemsrp.c:4
MESSAGE_TYPE_MCAST
@ MESSAGE_TYPE_MCAST
Definition: totemsrp.c:145
totemsrp_instance::my_new_memb_list
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:309
totemsrp_instance::my_set_retrans_flg
int my_set_retrans_flg
Definition: totemsrp.c:357
totem_config::merge_timeout
unsigned int merge_timeout
Definition: totem.h:190
totemsrp_instance::my_deliver_memb_list
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:315
totemsrp_instance::my_trans_memb_entries
int my_trans_memb_entries
Definition: totemsrp.c:327
totemsrp_instance::my_ring_id
struct memb_ring_id my_ring_id
Definition: totemsrp.c:337
CS_PRI_NODE_ID
#define CS_PRI_NODE_ID
Definition: corotypes.h:59
message_item::msg_len
unsigned int msg_len
Definition: totemsrp.c:266
mcast::system_from
struct srp_addr system_from
Definition: totemsrp.c:182
totemsrp_stats_t::mcast_rx
uint64_t mcast_rx
Definition: totemstats.h:63
FRAME_SIZE_MAX
#define FRAME_SIZE_MAX
Definition: totem.h:52
consensus_list_item
Definition: totemsrp.c:160
totemsrp_instance::timer_orf_token_warning
qb_loop_timer_handle timer_orf_token_warning
Definition: totemsrp.c:402
swab.h
totem_config::token_timeout
unsigned int token_timeout
Definition: totem.h:174
totemsrp_instance::failed_to_recv
int failed_to_recv
Definition: totemsrp.c:284
mcast::header
struct totem_message_header header
Definition: totemsrp.c:181
totemsrp_stats_clear
void totemsrp_stats_clear(void *context, int flags)
Definition: totemsrp.c:5195
totemnet_stats_clear
void totemnet_stats_clear(void *net_context)
Definition: totemnet.c:574
totemsrp_token_stats_t::backlog_calc
int backlog_calc
Definition: totemstats.h:50
memb_join::ring_seq
unsigned long long ring_seq
Definition: totemsrp.c:217
totem_config::interfaces
struct totem_interface * interfaces
Definition: totem.h:158
totemsrp_instance::my_aru_count
int my_aru_count
Definition: totemsrp.c:341
addr
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
totemsrp_iface_set
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemsrp.c:5055
totemip_copy
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:123
totemsrp_stats_t::orf_token_rx
uint64_t orf_token_rx
Definition: totemstats.h:56
totemsrp_instance::threaded_mode_enabled
uint32_t threaded_mode_enabled
Definition: totemsrp.c:519
TOTEM_MH_MAGIC
#define TOTEM_MH_MAGIC
Definition: totem.h:121
memb_commit_token
Definition: totemsrp.c:248
srp_addr
Definition: totemsrp.c:104
totemnet_token_send
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:398
totemsrp_instance::my_seq_unchanged
int my_seq_unchanged
Definition: totemsrp.c:347
totemsrp_stats_t::memb_merge_detect_rx
uint64_t memb_merge_detect_rx
Definition: totemstats.h:58
sq
The sq struct.
Definition: sq.h:43
message_type
message_type
Definition: totemsrp.c:143
seq
unsigned int seq
Definition: totemsrp.c:4
totemsrp_event_signal
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2447
CS_PRI_RING_ID_SEQ
#define CS_PRI_RING_ID_SEQ
Definition: corotypes.h:60
totemsrp_socket::mcast
int mcast
Definition: totemsrp.c:176
totemsrp_instance::consensus_list_entries
int consensus_list_entries
Definition: totemsrp.c:297
MESSAGE_QUEUE_MAX
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:98
totemnet_reconfigure
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition: totemnet.c:560
memb_commit_token::memb_index
int memb_index
Definition: totemsrp.c:253
totem_config::token_hold_timeout
unsigned int token_hold_timeout
Definition: totem.h:180
totemsrp_instance::my_install_seq
unsigned int my_install_seq
Definition: totemsrp.c:353
message_item::mcast
struct mcast * mcast
Definition: totemsrp.c:265
totemsrp_instance::totemsrp_log_level_warning
int totemsrp_log_level_warning
Definition: totemsrp.c:425
totemsrp_instance::memb_ring_id_store
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:473
totemsrp_instance::old_ring_state_high_seq_received
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:491
cs_queue.h
totemsrp_instance::global_seqno
int global_seqno
Definition: totemsrp.c:477
totemnet_finalize
int totemnet_finalize(void *net_context)
Definition: totemnet.c:290
totem_config::net_mtu
unsigned int net_mtu
Definition: totem.h:202
totemsrp_instance::stats
totemsrp_stats_t stats
Definition: totemsrp.c:513
totemsrp_instance::regular_sort_queue
struct sq regular_sort_queue
Definition: totemsrp.c:374
totemsrp_instance::lowest_active_if
int lowest_active_if
Definition: totemsrp.c:299
INTERFACE_MAX
#define INTERFACE_MAX
Definition: coroapi.h:88
totemsrp_stats_t::gather_entered
uint64_t gather_entered
Definition: totemstats.h:70
orf_token
Definition: totemsrp.c:197
rtr_item
Definition: totemsrp.c:191
TOTEMPG_STATS_CLEAR_TRANSPORT
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition: totemstats.h:116
totemsrp_instance::my_failed_list
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:307
TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
Definition: totemsrp.c:553
totemsrp_instance::orf_token_discard
uint32_t orf_token_discard
Definition: totemsrp.c:515
totemsrp_instance::totemsrp_deliver_fn
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:451
orf_token::aru_addr
unsigned int aru_addr
Definition: totemsrp.c:202
totemsrp_instance::old_ring_state_aru
int old_ring_state_aru
Definition: totemsrp.c:489
totem_interface::configured
uint8_t configured
Definition: totem.h:87
aru
unsigned int aru
Definition: totemsrp.c:5
totemsrp_stats_t::mcast_retx
uint64_t mcast_retx
Definition: totemstats.h:62
gather_state_from
gather_state_from
Definition: totemsrp.c:539
MESSAGE_ENCAPSULATED
@ MESSAGE_ENCAPSULATED
Definition: totemsrp.c:153
totemsrp_instance::my_deliver_memb_entries
int my_deliver_memb_entries
Definition: totemsrp.c:331
memb_commit_token::header
struct totem_message_header header
Definition: totemsrp.c:249
token_callback_instance::callback_type
enum totem_callback_token_type callback_type
Definition: totemsrp.c:169
totemsrp_instance::my_left_memb_list
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:317
message_handlers::count
int count
Definition: totemsrp.c:531
totemsrp_instance::token_callback_received_listhead
struct qb_list_head token_callback_received_listhead
Definition: totemsrp.c:385
totemnet_ifaces_get
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition: totemnet.c:468
totemsrp_instance::my_retrans_flg_count
int my_retrans_flg_count
Definition: totemsrp.c:359
totemsrp_instance::recovery_sort_queue
struct sq recovery_sort_queue
Definition: totemsrp.c:376
totem_interface::boundto
struct totem_ip_address boundto
Definition: totem.h:83
memb_commit_token::end_of_commit_token
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:255
totemsrp_trans_ack
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:5177
totem_config::miss_count_const
unsigned int miss_count_const
Definition: totem.h:232
totemsrp_instance::my_old_ring_id
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:339
totemsrp_instance::totemsrp_service_ready_fn
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:464
totemnet_member_add
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:504
totemsrp_instance::my_token_seq
unsigned int my_token_seq
Definition: totemsrp.c:393
totem_logging_configuration::log_subsys_id
int log_subsys_id
Definition: totem.h:114
totem_ip_address::family
unsigned short family
Definition: coroapi.h:113
totem_interface::mcast_addr
struct totem_ip_address mcast_addr
Definition: totem.h:84
totemsrp_force_gather
void totemsrp_force_gather(void *context)
Definition: totemsrp.c:5205
totemsrp_instance::fcc_remcast_last
int fcc_remcast_last
Definition: totemsrp.c:289
totemnet_crypto_set
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition: totemnet.c:276
MESSAGE_TYPE_MEMB_COMMIT_TOKEN
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
Definition: totemsrp.c:148
totemsrp_instance::totemsrp_waiting_trans_ack_cb_fn
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:466
swab16
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
flags
uint32_t flags
Definition: exec/votequorum.c:103
orf_token::token_seq
unsigned int token_seq
Definition: totemsrp.c:200
totem_config
Definition: totem.h:152
totemsrp_instance::my_high_seq_received
unsigned int my_high_seq_received
Definition: totemsrp.c:351
rtr_list
struct rtr_item rtr_list[0]
Definition: totemsrp.c:12
totemsrp_stats_t::token_hold_cancel_rx
uint64_t token_hold_cancel_rx
Definition: totemstats.h:67
totemsrp_instance::my_high_ring_delivered
unsigned int my_high_ring_delivered
Definition: totemsrp.c:361
orf_token::header
struct totem_message_header header
Definition: totemsrp.c:198
TOTEM_MH_VERSION
#define TOTEM_MH_VERSION
Definition: totem.h:122
totemnet.h
TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
Definition: totemsrp.c:546
TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
Definition: totemsrp.c:543
totem_ip_address
The totem_ip_address struct.
Definition: coroapi.h:111
TOTEMSRP_GSFROM_INTERFACE_CHANGE
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
Definition: totemsrp.c:555
consensus_list_item::set
int set
Definition: totemsrp.c:162
sq.h
totemip_sa_print
const char * totemip_sa_print(const struct sockaddr *sa)
Definition: totemip.c:242
CS_PRI_RING_ID
#define CS_PRI_RING_ID
Definition: corotypes.h:61
totemsrp_instance::my_memb_list
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:313
totemsrp_instance::timer_heartbeat_timeout
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:416
MEMB_STATE_RECOVERY
@ MEMB_STATE_RECOVERY
Definition: totemsrp.c:278
TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
Definition: totemsrp.c:542
totemsrp_instance::timer_merge_detect_timeout
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:408
totem_message_header
Definition: totem.h:124
memb_ring_id
The memb_ring_id struct.
Definition: coroapi.h:122
memb_commit_token::token_seq
unsigned int token_seq
Definition: totemsrp.c:250
memb_join::header
struct totem_message_header header
Definition: totemsrp.c:213
TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
Definition: totemsrp.c:544
totemsrp_stats_t::continuous_gather
uint32_t continuous_gather
Definition: totemstats.h:78
main_iface_change_fn
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:5076
rtr_item::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:192
totemsrp_instance
Definition: totemsrp.c:281
LEAVE_DUMMY_NODEID
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:99
TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
Definition: totemsrp.c:540
totemsrp_instance::tv_old
struct timeval tv_old
Definition: totemsrp.c:495
ring_seq
unsigned long long ring_seq
Definition: totemsrp.c:6
totemsrp_stats_t::gather_token_lost
uint64_t gather_token_lost
Definition: totemstats.h:71
totemsrp_instance::consensus_list
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:295
srp_addr::nodeid
unsigned int nodeid
Definition: totemsrp.c:105
totemsrp_instance::memb_timer_state_commit_timeout
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:414
totemsrp_instance::my_id
struct srp_addr my_id
Definition: totemsrp.c:301
totemsrp_instance::my_pbl
unsigned int my_pbl
Definition: totemsrp.c:505
sort_queue_item::msg_len
unsigned int msg_len
Definition: totemsrp.c:271
system_from
struct srp_addr system_from
Definition: totemsrp.c:3
memb_commit_token_memb_entry::high_delivered
unsigned int high_delivered
Definition: totemsrp.c:243
totemsrp_instance::orf_token_retransmit
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:389
totem_logging_configuration::log_level_warning
int log_level_warning
Definition: totem.h:110
totem_logging_configuration::log_level_security
void(*) in log_level_security)
Definition: totem.h:106
orf_token::aru
unsigned int aru
Definition: totemsrp.c:201
totem_event_type
totem_event_type
Definition: totem.h:259
totemsrp_stats_t::token
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totemstats.h:90
totemsrp_instance::my_proc_list
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:305
failed_list_entries
unsigned int failed_list_entries
Definition: totemsrp.c:5
memb_merge_detect
Definition: totemsrp.c:227
totemsrp_instance::my_last_seq
unsigned int my_last_seq
Definition: totemsrp.c:493
totemnet_mcast_noflush_send
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:423
memb_commit_token_memb_entry::received_flg
unsigned int received_flg
Definition: totemsrp.c:244
totemsrp_instance::use_heartbeat
unsigned int use_heartbeat
Definition: totemsrp.c:501
totemsrp_instance::set_aru
unsigned int set_aru
Definition: totemsrp.c:485
totemsrp_stats_t::memb_merge_detect_tx
uint64_t memb_merge_detect_tx
Definition: totemstats.h:57
swab64
#define swab64(x)
The swab64 macro.
Definition: swab.h:65
memb_merge_detect::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:230
totemsrp_instance::my_rotation_counter
int my_rotation_counter
Definition: totemsrp.c:355
totemsrp_stats_t::memb_commit_token_rx
uint64_t memb_commit_token_rx
Definition: totemstats.h:65
token_hold_cancel
Definition: totemsrp.c:234
totemsrp_instance::my_trans_memb_list
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:311
totem_config::token_retransmit_timeout
unsigned int token_retransmit_timeout
Definition: totem.h:178
SEQNO_START_TOKEN
#define SEQNO_START_TOKEN
Definition: totemsrp.c:119
totemnet_iface_set
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemnet.c:455
totemsrp_instance::totemsrp_log_level_error
int totemsrp_log_level_error
Definition: totemsrp.c:423
RETRANSMIT_ENTRIES_MAX
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:97
totem_message_header::magic
unsigned short magic
Definition: totem.h:125
token_callback_instance::data
void * data
Definition: totemsrp.c:171
totemsrp_instance::my_received_flg
int my_received_flg
Definition: totemsrp.c:349
totem_message_header::version
char version
Definition: totem.h:126
totemsrp_message_handlers
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:675
nodeid
unsigned int nodeid
Definition: coroapi.h:75
totemsrp_instance::my_trc
unsigned int my_trc
Definition: totemsrp.c:503
totemnet_token_target_set
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition: totemnet.c:481
totem_config::totem_memb_ring_id_create_or_load
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:238
mcast::seq
unsigned int seq
Definition: totemsrp.c:183
totemsrp_stats_t::operational_entered
uint64_t operational_entered
Definition: totemstats.h:68
totemsrp_instance::totem_config
struct totem_config * totem_config
Definition: totemsrp.c:499
totemsrp_stats_t::memb_join_tx
uint64_t memb_join_tx
Definition: totemstats.h:59
totemsrp_stats_t::recovery_entered
uint64_t recovery_entered
Definition: totemstats.h:74
totemsrp_instance::token_ring_id_seq
unsigned long long token_ring_id_seq
Definition: totemsrp.c:481
totemsrp_member_add
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5144
totemsrp_socket::token
int token
Definition: totemsrp.c:177
totem_config::seqno_unchanged_const
unsigned int seqno_unchanged_const
Definition: totem.h:196
memb_ring_id::seq
unsigned long long seq
Definition: coroapi.h:124
totemsrp_instance::totemsrp_log_printf
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:435
ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:6
totemsrp_avail
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2527
memb_commit_token_memb_entry
Definition: totemsrp.c:240
totemsrp_stats_t::consensus_timeouts
uint64_t consensus_timeouts
Definition: totemstats.h:76
TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
Definition: totemsrp.c:552
totemsrp_instance::new_message_queue
struct cs_queue new_message_queue
Definition: totemsrp.c:368
totemsrp_stats_t::recovery_token_lost
uint64_t recovery_token_lost
Definition: totemstats.h:75
totemsrp_instance::retrans_message_queue
struct cs_queue retrans_message_queue
Definition: totemsrp.c:372
totem_config::send_join_timeout
unsigned int send_join_timeout
Definition: totem.h:186
totem_config::window_size
unsigned int window_size
Definition: totem.h:210
PROCESSOR_COUNT_MAX
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
totemsrp_stats_t::commit_entered
uint64_t commit_entered
Definition: totemstats.h:72
received_flg
unsigned int received_flg
Definition: totemsrp.c:5
config.h
orf_token::rtr_list
struct rtr_item rtr_list[0]
Definition: totemsrp.c:208
orf_token::backlog
unsigned int backlog
Definition: totemsrp.c:204
mcast::guarantee
int guarantee
Definition: totemsrp.c:187
message_item
Definition: totemsrp.c:264
totemnet_processor_count_set
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition: totemnet.c:367
MEMB_STATE_COMMIT
@ MEMB_STATE_COMMIT
Definition: totemsrp.c:277
encapsulation_type
encapsulation_type
Definition: totemsrp.c:152
logsys.h
totemsrp_instance::flushing
int flushing
Definition: totemsrp.c:523
sort_queue_item::mcast
struct mcast * mcast
Definition: totemsrp.c:270
totem_config::threads
unsigned int threads
Definition: totem.h:204
totemsrp_initialize
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:816
rtr_list_entries
int rtr_list_entries
Definition: totemsrp.c:11
totem_logging_configuration::log_printf
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:99
memb_state
memb_state
Definition: totemsrp.c:274
totemnet_member_remove
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:524
totemsrp_instance::iface_changes
int iface_changes
Definition: totemsrp.c:282
totemsrp_instance::totemsrp_poll_handle
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:447
totemsrp_instance::totemsrp_log_level_notice
int totemsrp_log_level_notice
Definition: totemsrp.c:427
totemsrp_instance::memb_timer_state_gather_consensus_timeout
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:412
totemsrp_instance::my_left_memb_entries
int my_left_memb_entries
Definition: totemsrp.c:333
totemsrp_instance::mcast_address
struct totem_ip_address mcast_address
Definition: totemsrp.c:449
memb_commit_token::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:251
totemsrp_instance::timer_orf_token_timeout
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:400
totemsrp_instance::my_merge_detect_timeout_outstanding
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:343
totemsrp_instance::heartbeat_timeout
int heartbeat_timeout
Definition: totemsrp.c:363
memb_merge_detect::header
struct totem_message_header header
Definition: totemsrp.c:228
MESSAGE_TYPE_MEMB_JOIN
@ MESSAGE_TYPE_MEMB_JOIN
Definition: totemsrp.c:147
memb_join::system_from
struct srp_addr system_from
Definition: totemsrp.c:214
MESSAGE_TYPE_TOKEN_HOLD_CANCEL
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
Definition: totemsrp.c:149
totemsrp_instance::totemsrp_log_level_trace
int totemsrp_log_level_trace
Definition: totemsrp.c:431
totemnet_recv_flush
int totemnet_recv_flush(void *net_context)
Definition: totemnet.c:378
totemnet_initialize
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition: totemnet.c:301
totemsrp_my_nodeid_get
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1099
TOTEM_CONFIGURATION_TRANSITIONAL
@ TOTEM_CONFIGURATION_TRANSITIONAL
Definition: coroapi.h:134
token_hold_cancel::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:236
totemsrp_ifaces_get
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1047
sort_queue_item
Definition: totemsrp.c:269
TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
Definition: totemsrp.c:548
totemsrp_member_remove
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5157
totemsrp_instance::timer_orf_token_hold_retransmit_timeout
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:406
totemsrp_instance::my_memb_entries
int my_memb_entries
Definition: totemsrp.c:329
SEQNO_START_MSG
#define SEQNO_START_MSG
Definition: totemsrp.c:118
guarantee
int guarantee
Definition: totemsrp.c:8
mcast::this_seqno
int this_seqno
Definition: totemsrp.c:184
totemsrp_instance::my_failed_list_entries
int my_failed_list_entries
Definition: totemsrp.c:323
memb_commit_token::addr_entries
int addr_entries
Definition: totemsrp.c:254
totemsrp_threaded_mode_enable
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:5170
totemsrp_instance::pause_timestamp
uint64_t pause_timestamp
Definition: totemsrp.c:509
token_callback_instance::callback_fn
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:168
totemnet_recv_mcast_empty
int totemnet_recv_mcast_empty(void *net_context)
Definition: totemnet.c:493
totemsrp_stats_t::orf_token_tx
uint64_t orf_token_tx
Definition: totemstats.h:55
totemsrp_mcast
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2456
totemsrp_instance::totemsrp_subsys_id
int totemsrp_subsys_id
Definition: totemsrp.c:433
totemsrp_instance::token_recv_event_handle
void * token_recv_event_handle
Definition: totemsrp.c:525
totemsrp_instance::my_leave_memb_list
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:319
totemsrp_instance::totemnet_context
void * totemnet_context
Definition: totemsrp.c:497
totemnet_iface_check
int totemnet_iface_check(void *net_context)
Definition: totemnet.c:436
totem_config::max_network_delay
unsigned int max_network_delay
Definition: totem.h:208
memb_join
Definition: totemsrp.c:212
totemsrp_instance::my_addrs
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition: totemsrp.c:303
totemsrp_instance::new_message_queue_trans
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:370
memb_merge_detect::system_from
struct srp_addr system_from
Definition: totemsrp.c:229
totemsrp_callback_token_destroy
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3489
memb_commit_token_memb_entry::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:241
orf_token::rtr_list_entries
int rtr_list_entries
Definition: totemsrp.c:207
totemsrp_instance::memb_timer_state_gather_join_timeout
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:410
totem_config::consensus_timeout
unsigned int consensus_timeout
Definition: totem.h:188
totemsrp_instance::token_callback_sent_listhead
struct qb_list_head token_callback_sent_listhead
Definition: totemsrp.c:387
totemsrp_stats_t::operational_token_lost
uint64_t operational_token_lost
Definition: totemstats.h:69
totemsrp_instance::waiting_trans_ack
uint32_t waiting_trans_ack
Definition: totemsrp.c:521
totemsrp_instance::orf_token_retransmit_size
int orf_token_retransmit_size
Definition: totemsrp.c:391
token_hold_cancel::header
struct totem_message_header header
Definition: totemsrp.c:235
totemsrp_instance::originated_orf_token
uint32_t originated_orf_token
Definition: totemsrp.c:517
totemsrp_instance::my_new_memb_entries
int my_new_memb_entries
Definition: totemsrp.c:325
tv_old
unsigned long long int tv_old
Definition: totemsrp.c:3811
totemsrp_instance::memb_state
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:443
totem_config::totem_logging_configuration
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:200