corosync  3.0.3
totempg.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2005 MontaVista Software, Inc.
3  * Copyright (c) 2005 OSDL.
4  * Copyright (c) 2006-2012 Red Hat, Inc.
5  *
6  * All rights reserved.
7  *
8  * Author: Steven Dake (sdake@redhat.com)
9  * Author: Mark Haverkamp (markh@osdl.org)
10  *
11  * This software licensed under BSD license, the text of which follows:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions are met:
15  *
16  * - Redistributions of source code must retain the above copyright notice,
17  * this list of conditions and the following disclaimer.
18  * - Redistributions in binary form must reproduce the above copyright notice,
19  * this list of conditions and the following disclaimer in the documentation
20  * and/or other materials provided with the distribution.
21  * - Neither the name of the MontaVista Software, Inc. nor the names of its
22  * contributors may be used to endorse or promote products derived from this
23  * software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35  * THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 /*
39  * FRAGMENTATION AND PACKING ALGORITHM:
40  *
41  * Assemble the entire message into one buffer
42  * if full fragment
43  * store fragment into lengths list
44  * for each full fragment
45  * multicast fragment
46  * set length and fragment fields of pg mesage
47  * store remaining multicast into head of fragmentation data and set lens field
48  *
49  * If a message exceeds the maximum packet size allowed by the totem
50  * single ring protocol, the protocol could lose forward progress.
51  * Statically calculating the allowed data amount doesn't work because
52  * the amount of data allowed depends on the number of fragments in
53  * each message. In this implementation, the maximum fragment size
54  * is dynamically calculated for each fragment added to the message.
55 
56  * It is possible for a message to be two bytes short of the maximum
57  * packet size. This occurs when a message or collection of
58  * messages + the mcast header + the lens are two bytes short of the
59  * end of the packet. Since another len field consumes two bytes, the
60  * len field would consume the rest of the packet without room for data.
61  *
62  * One optimization would be to forgo the final len field and determine
63  * it from the size of the udp datagram. Then this condition would no
64  * longer occur.
65  */
66 
67 /*
68  * ASSEMBLY AND UNPACKING ALGORITHM:
69  *
70  * copy incoming packet into assembly data buffer indexed by current
71  * location of end of fragment
72  *
73  * if not fragmented
74  * deliver all messages in assembly data buffer
75  * else
76  * if msg_count > 1 and fragmented
77  * deliver all messages except last message in assembly data buffer
78  * copy last fragmented section to start of assembly data buffer
79  * else
80  * if msg_count = 1 and fragmented
81  * do nothing
82  *
83  */
84 
85 #include <config.h>
86 
87 #ifdef HAVE_ALLOCA_H
88 #include <alloca.h>
89 #endif
90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <netinet/in.h>
93 #include <arpa/inet.h>
94 #include <sys/uio.h>
95 #include <stdio.h>
96 #include <stdlib.h>
97 #include <string.h>
98 #include <assert.h>
99 #include <pthread.h>
100 #include <errno.h>
101 #include <limits.h>
102 
103 #include <corosync/swab.h>
104 #include <qb/qblist.h>
105 #include <qb/qbloop.h>
106 #include <qb/qbipcs.h>
107 #include <corosync/totem/totempg.h>
108 #define LOGSYS_UTILS_ONLY 1
109 #include <corosync/logsys.h>
110 
111 #include "util.h"
112 #include "totemsrp.h"
113 
115  short version;
116  short type;
117 };
118 
119 #if !(defined(__i386__) || defined(__x86_64__))
120 /*
121  * Need align on architectures different then i386 or x86_64
122  */
123 #define TOTEMPG_NEED_ALIGN 1
124 #endif
125 
126 /*
127  * totempg_mcast structure
128  *
129  * header: Identify the mcast.
130  * fragmented: Set if this message continues into next message
131  * continuation: Set if this message is a continuation from last message
132  * msg_count Indicates how many packed messages are contained
133  * in the mcast.
134  * Also, the size of each packed message and the messages themselves are
135  * appended to the end of this structure when sent.
136  */
139  unsigned char fragmented;
140  unsigned char continuation;
141  unsigned short msg_count;
142  /*
143  * short msg_len[msg_count];
144  */
145  /*
146  * data for messages
147  */
148 };
149 
150 /*
151  * Maximum packet size for totem pg messages
152  */
153 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
154  sizeof (struct totempg_mcast))
155 
156 /*
157  * Local variables used for packing small messages
158  */
159 static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
160 
161 static int mcast_packed_msg_count = 0;
162 
163 static int totempg_reserved = 1;
164 
165 static unsigned int totempg_size_limit;
166 
167 static totem_queue_level_changed_fn totem_queue_level_changed = NULL;
168 
169 static uint32_t totempg_threaded_mode = 0;
170 
171 static void *totemsrp_context;
172 
173 /*
174  * Function and data used to log messages
175  */
176 static int totempg_log_level_security;
177 static int totempg_log_level_error;
178 static int totempg_log_level_warning;
179 static int totempg_log_level_notice;
180 static int totempg_log_level_debug;
181 static int totempg_subsys_id;
182 static void (*totempg_log_printf) (
183  int level,
184  int subsys,
185  const char *function,
186  const char *file,
187  int line,
188  const char *format, ...) __attribute__((format(printf, 6, 7)));
189 
191 
192 static totempg_stats_t totempg_stats;
193 
197 };
198 
199 struct assembly {
200  unsigned int nodeid;
201  unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE];
202  int index;
203  unsigned char last_frag_num;
205  struct qb_list_head list;
206 };
207 
208 static void assembly_deref (struct assembly *assembly);
209 
210 static int callback_token_received_fn (enum totem_callback_token_type type,
211  const void *data);
212 
213 QB_LIST_DECLARE(assembly_list_inuse);
214 
215 /*
216  * Free list is used both for transitional and operational assemblies
217  */
218 QB_LIST_DECLARE(assembly_list_free);
219 
220 QB_LIST_DECLARE(assembly_list_inuse_trans);
221 
222 QB_LIST_DECLARE(totempg_groups_list);
223 
224 /*
225  * Staging buffer for packed messages. Messages are staged in this buffer
226  * before sending. Multiple messages may fit which cuts down on the
227  * number of mcasts sent. If a message doesn't completely fit, then
228  * the mcast header has a fragment bit set that says that there are more
229  * data to follow. fragment_size is an index into the buffer. It indicates
230  * the size of message data and where to place new message data.
231  * fragment_contuation indicates whether the first packed message in
232  * the buffer is a continuation of a previously packed fragment.
233  */
234 static unsigned char *fragmentation_data;
235 
236 static int fragment_size = 0;
237 
238 static int fragment_continuation = 0;
239 
240 static int totempg_waiting_transack = 0;
241 
243  void (*deliver_fn) (
244  unsigned int nodeid,
245  const void *msg,
246  unsigned int msg_len,
247  int endian_conversion_required);
248 
249  void (*confchg_fn) (
250  enum totem_configuration_type configuration_type,
251  const unsigned int *member_list, size_t member_list_entries,
252  const unsigned int *left_list, size_t left_list_entries,
253  const unsigned int *joined_list, size_t joined_list_entries,
254  const struct memb_ring_id *ring_id);
255 
257 
259  int32_t q_level;
260 
261  struct qb_list_head list;
262 };
263 
264 static unsigned char next_fragment = 1;
265 
266 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
267 
268 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
269 
270 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
271 
272 #define log_printf(level, format, args...) \
273 do { \
274  totempg_log_printf(level, \
275  totempg_subsys_id, \
276  __FUNCTION__, __FILE__, __LINE__, \
277  format, ##args); \
278 } while (0);
279 
280 static int msg_count_send_ok (int msg_count);
281 
282 static int byte_count_send_ok (int byte_count);
283 
284 static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
285 {
286  log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
287  totempg_waiting_transack = waiting_trans_ack;
288 }
289 
290 static struct assembly *assembly_ref (unsigned int nodeid)
291 {
292  struct assembly *assembly;
293  struct qb_list_head *list;
294  struct qb_list_head *active_assembly_list_inuse;
295 
296  if (totempg_waiting_transack) {
297  active_assembly_list_inuse = &assembly_list_inuse_trans;
298  } else {
299  active_assembly_list_inuse = &assembly_list_inuse;
300  }
301 
302  /*
303  * Search inuse list for node id and return assembly buffer if found
304  */
305  qb_list_for_each(list, active_assembly_list_inuse) {
306  assembly = qb_list_entry (list, struct assembly, list);
307 
308  if (nodeid == assembly->nodeid) {
309  return (assembly);
310  }
311  }
312 
313  /*
314  * Nothing found in inuse list get one from free list if available
315  */
316  if (qb_list_empty (&assembly_list_free) == 0) {
317  assembly = qb_list_first_entry (&assembly_list_free, struct assembly, list);
318  qb_list_del (&assembly->list);
319  qb_list_add (&assembly->list, active_assembly_list_inuse);
321  assembly->index = 0;
322  assembly->last_frag_num = 0;
324  return (assembly);
325  }
326 
327  /*
328  * Nothing available in inuse or free list, so allocate a new one
329  */
330  assembly = malloc (sizeof (struct assembly));
331  /*
332  * TODO handle memory allocation failure here
333  */
334  assert (assembly);
336  assembly->data[0] = 0;
337  assembly->index = 0;
338  assembly->last_frag_num = 0;
340  qb_list_init (&assembly->list);
341  qb_list_add (&assembly->list, active_assembly_list_inuse);
342 
343  return (assembly);
344 }
345 
346 static void assembly_deref (struct assembly *assembly)
347 {
348  qb_list_del (&assembly->list);
349  qb_list_add (&assembly->list, &assembly_list_free);
350 }
351 
352 static void assembly_deref_from_normal_and_trans (int nodeid)
353 {
354  int j;
355  struct qb_list_head *list, *tmp_iter;
356  struct qb_list_head *active_assembly_list_inuse;
357  struct assembly *assembly;
358 
359  for (j = 0; j < 2; j++) {
360  if (j == 0) {
361  active_assembly_list_inuse = &assembly_list_inuse;
362  } else {
363  active_assembly_list_inuse = &assembly_list_inuse_trans;
364  }
365 
366  qb_list_for_each_safe(list, tmp_iter, active_assembly_list_inuse) {
367  assembly = qb_list_entry (list, struct assembly, list);
368 
369  if (nodeid == assembly->nodeid) {
370  qb_list_del (&assembly->list);
371  qb_list_add (&assembly->list, &assembly_list_free);
372  }
373  }
374  }
375 
376 }
377 
378 static inline void app_confchg_fn (
379  enum totem_configuration_type configuration_type,
380  const unsigned int *member_list, size_t member_list_entries,
381  const unsigned int *left_list, size_t left_list_entries,
382  const unsigned int *joined_list, size_t joined_list_entries,
383  const struct memb_ring_id *ring_id)
384 {
385  int i;
386  struct totempg_group_instance *instance;
387  struct qb_list_head *list;
388 
389  /*
390  * For every leaving processor, add to free list
391  * This also has the side effect of clearing out the dataset
392  * In the leaving processor's assembly buffer.
393  */
394  for (i = 0; i < left_list_entries; i++) {
395  assembly_deref_from_normal_and_trans (left_list[i]);
396  }
397 
398  qb_list_for_each(list, &totempg_groups_list) {
399  instance = qb_list_entry (list, struct totempg_group_instance, list);
400 
401  if (instance->confchg_fn) {
402  instance->confchg_fn (
403  configuration_type,
404  member_list,
405  member_list_entries,
406  left_list,
407  left_list_entries,
408  joined_list,
409  joined_list_entries,
410  ring_id);
411  }
412  }
413 }
414 
415 static inline void group_endian_convert (
416  void *msg,
417  int msg_len)
418 {
419  unsigned short *group_len;
420  int i;
421  char *aligned_msg;
422 
423 #ifdef TOTEMPG_NEED_ALIGN
424  /*
425  * Align data structure for not i386 or x86_64
426  */
427  if ((size_t)msg % 4 != 0) {
428  aligned_msg = alloca(msg_len);
429  memcpy(aligned_msg, msg, msg_len);
430  } else {
431  aligned_msg = msg;
432  }
433 #else
434  aligned_msg = msg;
435 #endif
436 
437  group_len = (unsigned short *)aligned_msg;
438  group_len[0] = swab16(group_len[0]);
439  for (i = 1; i < group_len[0] + 1; i++) {
440  group_len[i] = swab16(group_len[i]);
441  }
442 
443  if (aligned_msg != msg) {
444  memcpy(msg, aligned_msg, msg_len);
445  }
446 }
447 
448 static inline int group_matches (
449  struct iovec *iovec,
450  unsigned int iov_len,
451  struct totempg_group *groups_b,
452  unsigned int group_b_cnt,
453  unsigned int *adjust_iovec)
454 {
455  unsigned short *group_len;
456  char *group_name;
457  int i;
458  int j;
459 #ifdef TOTEMPG_NEED_ALIGN
460  struct iovec iovec_aligned = { NULL, 0 };
461 #endif
462 
463  assert (iov_len == 1);
464 
465 #ifdef TOTEMPG_NEED_ALIGN
466  /*
467  * Align data structure for not i386 or x86_64
468  */
469  if ((size_t)iovec->iov_base % 4 != 0) {
470  iovec_aligned.iov_base = alloca(iovec->iov_len);
471  memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
472  iovec_aligned.iov_len = iovec->iov_len;
473  iovec = &iovec_aligned;
474  }
475 #endif
476 
477  group_len = (unsigned short *)iovec->iov_base;
478  group_name = ((char *)iovec->iov_base) +
479  sizeof (unsigned short) * (group_len[0] + 1);
480 
481 
482  /*
483  * Calculate amount to adjust the iovec by before delivering to app
484  */
485  *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
486  for (i = 1; i < group_len[0] + 1; i++) {
487  *adjust_iovec += group_len[i];
488  }
489 
490  /*
491  * Determine if this message should be delivered to this instance
492  */
493  for (i = 1; i < group_len[0] + 1; i++) {
494  for (j = 0; j < group_b_cnt; j++) {
495  if ((group_len[i] == groups_b[j].group_len) &&
496  (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
497  return (1);
498  }
499  }
500  group_name += group_len[i];
501  }
502  return (0);
503 }
504 
505 
506 static inline void app_deliver_fn (
507  unsigned int nodeid,
508  void *msg,
509  unsigned int msg_len,
510  int endian_conversion_required)
511 {
512  struct totempg_group_instance *instance;
513  struct iovec stripped_iovec;
514  unsigned int adjust_iovec;
515  struct iovec *iovec;
516  struct qb_list_head *list;
517 
518  struct iovec aligned_iovec = { NULL, 0 };
519 
520  if (endian_conversion_required) {
521  group_endian_convert (msg, msg_len);
522  }
523 
524  /*
525  * TODO: segmentation/assembly need to be redesigned to provide aligned access
526  * in all cases to avoid memory copies on non386 archs. Probably broke backwars
527  * compatibility
528  */
529 
530 #ifdef TOTEMPG_NEED_ALIGN
531  /*
532  * Align data structure for not i386 or x86_64
533  */
534  aligned_iovec.iov_base = alloca(msg_len);
535  aligned_iovec.iov_len = msg_len;
536  memcpy(aligned_iovec.iov_base, msg, msg_len);
537 #else
538  aligned_iovec.iov_base = msg;
539  aligned_iovec.iov_len = msg_len;
540 #endif
541 
542  iovec = &aligned_iovec;
543 
544  qb_list_for_each(list, &totempg_groups_list) {
545  instance = qb_list_entry (list, struct totempg_group_instance, list);
546  if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
547  stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
548  stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
549 
550 #ifdef TOTEMPG_NEED_ALIGN
551  /*
552  * Align data structure for not i386 or x86_64
553  */
554  if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) {
555  /*
556  * Deal with misalignment
557  */
558  stripped_iovec.iov_base =
559  alloca (stripped_iovec.iov_len);
560  memcpy (stripped_iovec.iov_base,
561  (char *)iovec->iov_base + adjust_iovec,
562  stripped_iovec.iov_len);
563  }
564 #endif
565  instance->deliver_fn (
566  nodeid,
567  stripped_iovec.iov_base,
568  stripped_iovec.iov_len,
569  endian_conversion_required);
570  }
571  }
572 }
573 
574 static void totempg_confchg_fn (
575  enum totem_configuration_type configuration_type,
576  const unsigned int *member_list, size_t member_list_entries,
577  const unsigned int *left_list, size_t left_list_entries,
578  const unsigned int *joined_list, size_t joined_list_entries,
579  const struct memb_ring_id *ring_id)
580 {
581 // TODO optimize this
582  app_confchg_fn (configuration_type,
583  member_list, member_list_entries,
584  left_list, left_list_entries,
585  joined_list, joined_list_entries,
586  ring_id);
587 }
588 
589 static void totempg_deliver_fn (
590  unsigned int nodeid,
591  const void *msg,
592  unsigned int msg_len,
593  int endian_conversion_required)
594 {
595  struct totempg_mcast *mcast;
596  unsigned short *msg_lens;
597  int i;
598  struct assembly *assembly;
599  char header[FRAME_SIZE_MAX];
600  int msg_count;
601  int continuation;
602  int start;
603  const char *data;
604  int datasize;
605  struct iovec iov_delv;
606  size_t expected_msg_len;
607 
608  assembly = assembly_ref (nodeid);
609  assert (assembly);
610 
611  if (msg_len < sizeof(struct totempg_mcast)) {
612  log_printf(LOG_WARNING,
613  "Message (totempg_mcast) received from node " CS_PRI_NODE_ID " is too short... Ignoring.", nodeid);
614 
615  return ;
616  }
617 
618  /*
619  * Assemble the header into one block of data and
620  * assemble the packet contents into one block of data to simplify delivery
621  */
622 
623  mcast = (struct totempg_mcast *)msg;
624  if (endian_conversion_required) {
625  mcast->msg_count = swab16 (mcast->msg_count);
626  }
627 
628  msg_count = mcast->msg_count;
629  datasize = sizeof (struct totempg_mcast) +
630  msg_count * sizeof (unsigned short);
631 
632  if (msg_len < datasize) {
633  log_printf(LOG_WARNING,
634  "Message (totempg_mcast datasize) received from node " CS_PRI_NODE_ID
635  " is too short... Ignoring.", nodeid);
636 
637  return ;
638  }
639 
640  memcpy (header, msg, datasize);
641  data = msg;
642 
643  msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
644  expected_msg_len = datasize;
645  for (i = 0; i < mcast->msg_count; i++) {
646  if (endian_conversion_required) {
647  msg_lens[i] = swab16 (msg_lens[i]);
648  }
649 
650  expected_msg_len += msg_lens[i];
651  }
652 
653  if (msg_len != expected_msg_len) {
654  log_printf(LOG_WARNING,
655  "Message (totempg_mcast) received from node " CS_PRI_NODE_ID
656  " doesn't have expected length of %zu (has %u) bytes... Ignoring.",
657  nodeid, expected_msg_len, msg_len);
658 
659  return ;
660  }
661 
662  assert((assembly->index+msg_len) < sizeof(assembly->data));
663  memcpy (&assembly->data[assembly->index], &data[datasize],
664  msg_len - datasize);
665 
666  /*
667  * If the last message in the buffer is a fragment, then we
668  * can't deliver it. We'll first deliver the full messages
669  * then adjust the assembly buffer so we can add the rest of the
670  * fragment when it arrives.
671  */
672  msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
673  continuation = mcast->continuation;
674  iov_delv.iov_base = (void *)&assembly->data[0];
675  iov_delv.iov_len = assembly->index + msg_lens[0];
676 
677  /*
678  * Make sure that if this message is a continuation, that it
679  * matches the sequence number of the previous fragment.
680  * Also, if the first packed message is a continuation
681  * of a previous message, but the assembly buffer
682  * is empty, then we need to discard it since we can't
683  * assemble a complete message. Likewise, if this message isn't a
684  * continuation and the assembly buffer is empty, we have to discard
685  * the continued message.
686  */
687  start = 0;
688 
690  /* Throw away the first msg block */
691  if (mcast->fragmented == 0 || mcast->fragmented == 1) {
693 
694  assembly->index += msg_lens[0];
695  iov_delv.iov_base = (void *)&assembly->data[assembly->index];
696  iov_delv.iov_len = msg_lens[1];
697  start = 1;
698  }
699  } else
702  assembly->last_frag_num = mcast->fragmented;
703  for (i = start; i < msg_count; i++) {
704  app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
705  endian_conversion_required);
706  assembly->index += msg_lens[i];
707  iov_delv.iov_base = (void *)&assembly->data[assembly->index];
708  if (i < (msg_count - 1)) {
709  iov_delv.iov_len = msg_lens[i + 1];
710  }
711  }
712  } else {
713  log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
716  }
717  }
718 
719  if (mcast->fragmented == 0) {
720  /*
721  * End of messages, dereference assembly struct
722  */
723  assembly->last_frag_num = 0;
724  assembly->index = 0;
725  assembly_deref (assembly);
726  } else {
727  /*
728  * Message is fragmented, keep around assembly list
729  */
730  if (mcast->msg_count > 1) {
731  memmove (&assembly->data[0],
733  msg_lens[msg_count]);
734 
735  assembly->index = 0;
736  }
737  assembly->index += msg_lens[msg_count];
738  }
739 }
740 
741 /*
742  * Totem Process Group Abstraction
743  * depends on poll abstraction, POSIX, IPV4
744  */
745 
747 
748 int callback_token_received_fn (enum totem_callback_token_type type,
749  const void *data)
750 {
751  struct totempg_mcast mcast;
752  struct iovec iovecs[3];
753 
754  if (totempg_threaded_mode == 1) {
755  pthread_mutex_lock (&mcast_msg_mutex);
756  }
757  if (mcast_packed_msg_count == 0) {
758  if (totempg_threaded_mode == 1) {
759  pthread_mutex_unlock (&mcast_msg_mutex);
760  }
761  return (0);
762  }
763  if (totemsrp_avail(totemsrp_context) == 0) {
764  if (totempg_threaded_mode == 1) {
765  pthread_mutex_unlock (&mcast_msg_mutex);
766  }
767  return (0);
768  }
769  mcast.header.version = 0;
770  mcast.header.type = 0;
771  mcast.fragmented = 0;
772 
773  /*
774  * Was the first message in this buffer a continuation of a
775  * fragmented message?
776  */
777  mcast.continuation = fragment_continuation;
778  fragment_continuation = 0;
779 
780  mcast.msg_count = mcast_packed_msg_count;
781 
782  iovecs[0].iov_base = (void *)&mcast;
783  iovecs[0].iov_len = sizeof (struct totempg_mcast);
784  iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
785  iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
786  iovecs[2].iov_base = (void *)&fragmentation_data[0];
787  iovecs[2].iov_len = fragment_size;
788  (void)totemsrp_mcast (totemsrp_context, iovecs, 3, 0);
789 
790  mcast_packed_msg_count = 0;
791  fragment_size = 0;
792 
793  if (totempg_threaded_mode == 1) {
794  pthread_mutex_unlock (&mcast_msg_mutex);
795  }
796  return (0);
797 }
798 
799 /*
800  * Initialize the totem process group abstraction
801  */
803  qb_loop_t *poll_handle,
804  struct totem_config *totem_config)
805 {
806  int res;
807 
809  totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
810  totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
811  totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
812  totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
813  totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
816 
817  fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
818  if (fragmentation_data == 0) {
819  return (-1);
820  }
821 
823 
824  res = totemsrp_initialize (
825  poll_handle,
826  &totemsrp_context,
827  totem_config,
828  &totempg_stats,
829  totempg_deliver_fn,
830  totempg_confchg_fn,
831  totempg_waiting_trans_ack_cb);
832 
833  if (res == -1) {
834  goto error_exit;
835  }
836 
838  totemsrp_context,
841  0,
842  callback_token_received_fn,
843  0);
844 
845  totempg_size_limit = (totemsrp_avail(totemsrp_context) - 1) *
847  sizeof (struct totempg_mcast) - 16);
848 
849  qb_list_init (&totempg_groups_list);
850 
851 error_exit:
852  return (res);
853 }
854 
855 void totempg_finalize (void)
856 {
857  if (totempg_threaded_mode == 1) {
858  pthread_mutex_lock (&totempg_mutex);
859  }
860  totemsrp_finalize (totemsrp_context);
861  if (totempg_threaded_mode == 1) {
862  pthread_mutex_unlock (&totempg_mutex);
863  }
864 }
865 
866 /*
867  * Multicast a message
868  */
869 static int mcast_msg (
870  struct iovec *iovec_in,
871  unsigned int iov_len,
872  int guarantee)
873 {
874  int res = 0;
875  struct totempg_mcast mcast;
876  struct iovec iovecs[3];
877  struct iovec iovec[64];
878  int i;
879  int dest, src;
880  int max_packet_size = 0;
881  int copy_len = 0;
882  int copy_base = 0;
883  int total_size = 0;
884 
885  if (totempg_threaded_mode == 1) {
886  pthread_mutex_lock (&mcast_msg_mutex);
887  }
888  totemsrp_event_signal (totemsrp_context, TOTEM_EVENT_NEW_MSG, 1);
889 
890  /*
891  * Remove zero length iovectors from the list
892  */
893  assert (iov_len < 64);
894  for (dest = 0, src = 0; src < iov_len; src++) {
895  if (iovec_in[src].iov_len) {
896  memcpy (&iovec[dest++], &iovec_in[src],
897  sizeof (struct iovec));
898  }
899  }
900  iov_len = dest;
901 
902  max_packet_size = TOTEMPG_PACKET_SIZE -
903  (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
904 
905  mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
906 
907  /*
908  * Check if we would overwrite new message queue
909  */
910  for (i = 0; i < iov_len; i++) {
911  total_size += iovec[i].iov_len;
912  }
913 
914  if (byte_count_send_ok (total_size + sizeof(unsigned short) *
915  (mcast_packed_msg_count)) == 0) {
916 
917  if (totempg_threaded_mode == 1) {
918  pthread_mutex_unlock (&mcast_msg_mutex);
919  }
920  return(-1);
921  }
922 
923  memset(&mcast, 0, sizeof(mcast));
924 
925  mcast.header.version = 0;
926  for (i = 0; i < iov_len; ) {
927  mcast.fragmented = 0;
928  mcast.continuation = fragment_continuation;
929  copy_len = iovec[i].iov_len - copy_base;
930 
931  /*
932  * If it all fits with room left over, copy it in.
933  * We need to leave at least sizeof(short) + 1 bytes in the
934  * fragment_buffer on exit so that max_packet_size + fragment_size
935  * doesn't exceed the size of the fragment_buffer on the next call.
936  */
937  if ((iovec[i].iov_len + fragment_size) <
938  (max_packet_size - sizeof (unsigned short))) {
939 
940  memcpy (&fragmentation_data[fragment_size],
941  (char *)iovec[i].iov_base + copy_base, copy_len);
942  fragment_size += copy_len;
943  mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
944  next_fragment = 1;
945  copy_len = 0;
946  copy_base = 0;
947  i++;
948  continue;
949 
950  /*
951  * If it just fits or is too big, then send out what fits.
952  */
953  } else {
954  unsigned char *data_ptr;
955 
956  copy_len = min(copy_len, max_packet_size - fragment_size);
957  if( copy_len == max_packet_size )
958  data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
959  else {
960  data_ptr = fragmentation_data;
961  }
962 
963  memcpy (&fragmentation_data[fragment_size],
964  (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
965  mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
966 
967  /*
968  * if we're not on the last iovec or the iovec is too large to
969  * fit, then indicate a fragment. This also means that the next
970  * message will have the continuation of this one.
971  */
972  if ((i < (iov_len - 1)) ||
973  ((copy_base + copy_len) < iovec[i].iov_len)) {
974  if (!next_fragment) {
975  next_fragment++;
976  }
977  fragment_continuation = next_fragment;
978  mcast.fragmented = next_fragment++;
979  assert(fragment_continuation != 0);
980  assert(mcast.fragmented != 0);
981  } else {
982  fragment_continuation = 0;
983  }
984 
985  /*
986  * assemble the message and send it
987  */
988  mcast.msg_count = ++mcast_packed_msg_count;
989  iovecs[0].iov_base = (void *)&mcast;
990  iovecs[0].iov_len = sizeof(struct totempg_mcast);
991  iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
992  iovecs[1].iov_len = mcast_packed_msg_count *
993  sizeof(unsigned short);
994  iovecs[2].iov_base = (void *)data_ptr;
995  iovecs[2].iov_len = fragment_size + copy_len;
996  assert (totemsrp_avail(totemsrp_context) > 0);
997  res = totemsrp_mcast (totemsrp_context, iovecs, 3, guarantee);
998  if (res == -1) {
999  goto error_exit;
1000  }
1001 
1002  /*
1003  * Recalculate counts and indexes for the next.
1004  */
1005  mcast_packed_msg_lens[0] = 0;
1006  mcast_packed_msg_count = 0;
1007  fragment_size = 0;
1008  max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
1009 
1010  /*
1011  * If the iovec all fit, go to the next iovec
1012  */
1013  if ((copy_base + copy_len) == iovec[i].iov_len) {
1014  copy_len = 0;
1015  copy_base = 0;
1016  i++;
1017 
1018  /*
1019  * Continue with the rest of the current iovec.
1020  */
1021  } else {
1022  copy_base += copy_len;
1023  }
1024  }
1025  }
1026 
1027  /*
1028  * Bump only if we added message data. This may be zero if
1029  * the last buffer just fit into the fragmentation_data buffer
1030  * and we were at the last iovec.
1031  */
1032  if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1033  mcast_packed_msg_count++;
1034  }
1035 
1036 error_exit:
1037  if (totempg_threaded_mode == 1) {
1038  pthread_mutex_unlock (&mcast_msg_mutex);
1039  }
1040  return (res);
1041 }
1042 
1043 /*
1044  * Determine if a message of msg_size could be queued
1045  */
1046 static int msg_count_send_ok (
1047  int msg_count)
1048 {
1049  int avail = 0;
1050 
1051  avail = totemsrp_avail (totemsrp_context);
1052  totempg_stats.msg_queue_avail = avail;
1053 
1054  return ((avail - totempg_reserved) > msg_count);
1055 }
1056 
1057 static int byte_count_send_ok (
1058  int byte_count)
1059 {
1060  unsigned int msg_count = 0;
1061  int avail = 0;
1062 
1063  avail = totemsrp_avail (totemsrp_context);
1064 
1065  msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1066 
1067  return (avail >= msg_count);
1068 }
1069 
1070 static int send_reserve (
1071  int msg_size)
1072 {
1073  unsigned int msg_count = 0;
1074 
1075  msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1076  totempg_reserved += msg_count;
1077  totempg_stats.msg_reserved = totempg_reserved;
1078 
1079  return (msg_count);
1080 }
1081 
1082 static void send_release (
1083  int msg_count)
1084 {
1085  totempg_reserved -= msg_count;
1086  totempg_stats.msg_reserved = totempg_reserved;
1087 }
1088 
1089 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1090 #undef MESSAGE_QUEUE_MAX
1091 #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1092 #endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1093 
1094 static uint32_t q_level_precent_used(void)
1095 {
1096  return (100 - (((totemsrp_avail(totemsrp_context) - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX));
1097 }
1098 
1100  void **handle_out,
1102  int delete,
1103  int (*callback_fn) (enum totem_callback_token_type type, const void *),
1104  const void *data)
1105 {
1106  unsigned int res;
1107  if (totempg_threaded_mode == 1) {
1108  pthread_mutex_lock (&callback_token_mutex);
1109  }
1110  res = totemsrp_callback_token_create (totemsrp_context, handle_out, type, delete,
1111  callback_fn, data);
1112  if (totempg_threaded_mode == 1) {
1113  pthread_mutex_unlock (&callback_token_mutex);
1114  }
1115  return (res);
1116 }
1117 
1119  void *handle_out)
1120 {
1121  if (totempg_threaded_mode == 1) {
1122  pthread_mutex_lock (&callback_token_mutex);
1123  }
1124  totemsrp_callback_token_destroy (totemsrp_context, handle_out);
1125  if (totempg_threaded_mode == 1) {
1126  pthread_mutex_unlock (&callback_token_mutex);
1127  }
1128 }
1129 
1130 /*
1131  * vi: set autoindent tabstop=4 shiftwidth=4 :
1132  */
1133 
1135  void **totempg_groups_instance,
1136 
1137  void (*deliver_fn) (
1138  unsigned int nodeid,
1139  const void *msg,
1140  unsigned int msg_len,
1141  int endian_conversion_required),
1142 
1143  void (*confchg_fn) (
1144  enum totem_configuration_type configuration_type,
1145  const unsigned int *member_list, size_t member_list_entries,
1146  const unsigned int *left_list, size_t left_list_entries,
1147  const unsigned int *joined_list, size_t joined_list_entries,
1148  const struct memb_ring_id *ring_id))
1149 {
1150  struct totempg_group_instance *instance;
1151 
1152  if (totempg_threaded_mode == 1) {
1153  pthread_mutex_lock (&totempg_mutex);
1154  }
1155 
1156  instance = malloc (sizeof (struct totempg_group_instance));
1157  if (instance == NULL) {
1158  goto error_exit;
1159  }
1160 
1161  instance->deliver_fn = deliver_fn;
1162  instance->confchg_fn = confchg_fn;
1163  instance->groups = 0;
1164  instance->groups_cnt = 0;
1165  instance->q_level = QB_LOOP_MED;
1166  qb_list_init (&instance->list);
1167  qb_list_add (&instance->list, &totempg_groups_list);
1168 
1169  if (totempg_threaded_mode == 1) {
1170  pthread_mutex_unlock (&totempg_mutex);
1171  }
1172  *totempg_groups_instance = instance;
1173  return (0);
1174 
1175 error_exit:
1176  if (totempg_threaded_mode == 1) {
1177  pthread_mutex_unlock (&totempg_mutex);
1178  }
1179  return (-1);
1180 }
1181 
1183  void *totempg_groups_instance,
1184  const struct totempg_group *groups,
1185  size_t group_cnt)
1186 {
1187  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1188  struct totempg_group *new_groups;
1189  int res = 0;
1190 
1191  if (totempg_threaded_mode == 1) {
1192  pthread_mutex_lock (&totempg_mutex);
1193  }
1194 
1195  new_groups = realloc (instance->groups,
1196  sizeof (struct totempg_group) *
1197  (instance->groups_cnt + group_cnt));
1198  if (new_groups == 0) {
1199  res = -1;
1200  goto error_exit;
1201  }
1202  memcpy (&new_groups[instance->groups_cnt],
1203  groups, group_cnt * sizeof (struct totempg_group));
1204  instance->groups = new_groups;
1205  instance->groups_cnt += group_cnt;
1206 
1207 error_exit:
1208  if (totempg_threaded_mode == 1) {
1209  pthread_mutex_unlock (&totempg_mutex);
1210  }
1211  return (res);
1212 }
1213 
1215  void *totempg_groups_instance,
1216  const struct totempg_group *groups,
1217  size_t group_cnt)
1218 {
1219  if (totempg_threaded_mode == 1) {
1220  pthread_mutex_lock (&totempg_mutex);
1221  }
1222 
1223  if (totempg_threaded_mode == 1) {
1224  pthread_mutex_unlock (&totempg_mutex);
1225  }
1226  return (0);
1227 }
1228 
1229 #define MAX_IOVECS_FROM_APP 32
1230 #define MAX_GROUPS_PER_MSG 32
1231 
1233  void *totempg_groups_instance,
1234  const struct iovec *iovec,
1235  unsigned int iov_len,
1236  int guarantee)
1237 {
1238  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1239  unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1240  struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1241  int i;
1242  unsigned int res;
1243 
1244  if (totempg_threaded_mode == 1) {
1245  pthread_mutex_lock (&totempg_mutex);
1246  }
1247 
1248  /*
1249  * Build group_len structure and the iovec_mcast structure
1250  */
1251  group_len[0] = instance->groups_cnt;
1252  for (i = 0; i < instance->groups_cnt; i++) {
1253  group_len[i + 1] = instance->groups[i].group_len;
1254  iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
1255  iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
1256  }
1257  iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
1258  iovec_mcast[0].iov_base = group_len;
1259  for (i = 0; i < iov_len; i++) {
1260  iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1261  iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1262  }
1263 
1264  res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
1265 
1266  if (totempg_threaded_mode == 1) {
1267  pthread_mutex_unlock (&totempg_mutex);
1268  }
1269 
1270  return (res);
1271 }
1272 
1273 static void check_q_level(
1274  void *totempg_groups_instance)
1275 {
1276  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1277  int32_t old_level = instance->q_level;
1278  int32_t percent_used = q_level_precent_used();
1279 
1280  if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) {
1281  instance->q_level = TOTEM_Q_LEVEL_CRITICAL;
1282  } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) {
1283  instance->q_level = TOTEM_Q_LEVEL_LOW;
1284  } else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) {
1285  instance->q_level = TOTEM_Q_LEVEL_GOOD;
1286  } else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) {
1287  instance->q_level = TOTEM_Q_LEVEL_HIGH;
1288  }
1289  if (totem_queue_level_changed && old_level != instance->q_level) {
1290  totem_queue_level_changed(instance->q_level);
1291  }
1292 }
1293 
1295  void *totempg_groups_instance)
1296 {
1297  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1298 
1299  check_q_level(instance);
1300 }
1301 
1303  void *totempg_groups_instance,
1304  const struct iovec *iovec,
1305  unsigned int iov_len)
1306 {
1307  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1308  unsigned int size = 0;
1309  unsigned int i;
1310  unsigned int reserved = 0;
1311 
1312  if (totempg_threaded_mode == 1) {
1313  pthread_mutex_lock (&totempg_mutex);
1314  pthread_mutex_lock (&mcast_msg_mutex);
1315  }
1316 
1317  for (i = 0; i < instance->groups_cnt; i++) {
1318  size += instance->groups[i].group_len;
1319  }
1320  for (i = 0; i < iov_len; i++) {
1321  size += iovec[i].iov_len;
1322  }
1323 
1324  if (size >= totempg_size_limit) {
1325  reserved = -1;
1326  goto error_exit;
1327  }
1328 
1329  if (byte_count_send_ok (size)) {
1330  reserved = send_reserve (size);
1331  } else {
1332  reserved = 0;
1333  }
1334 
1335 error_exit:
1336  check_q_level(instance);
1337 
1338  if (totempg_threaded_mode == 1) {
1339  pthread_mutex_unlock (&mcast_msg_mutex);
1340  pthread_mutex_unlock (&totempg_mutex);
1341  }
1342  return (reserved);
1343 }
1344 
1345 
1347 {
1348  if (totempg_threaded_mode == 1) {
1349  pthread_mutex_lock (&totempg_mutex);
1350  pthread_mutex_lock (&mcast_msg_mutex);
1351  }
1352  send_release (msg_count);
1353  if (totempg_threaded_mode == 1) {
1354  pthread_mutex_unlock (&mcast_msg_mutex);
1355  pthread_mutex_unlock (&totempg_mutex);
1356  }
1357  return 0;
1358 }
1359 
1361  void *totempg_groups_instance,
1362  int guarantee,
1363  const struct totempg_group *groups,
1364  size_t groups_cnt,
1365  const struct iovec *iovec,
1366  unsigned int iov_len)
1367 {
1368  unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1369  struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1370  int i;
1371  unsigned int res;
1372 
1373  if (totempg_threaded_mode == 1) {
1374  pthread_mutex_lock (&totempg_mutex);
1375  }
1376 
1377  /*
1378  * Build group_len structure and the iovec_mcast structure
1379  */
1380  group_len[0] = groups_cnt;
1381  for (i = 0; i < groups_cnt; i++) {
1382  group_len[i + 1] = groups[i].group_len;
1383  iovec_mcast[i + 1].iov_len = groups[i].group_len;
1384  iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
1385  }
1386  iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
1387  iovec_mcast[0].iov_base = group_len;
1388  for (i = 0; i < iov_len; i++) {
1389  iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1390  iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1391  }
1392 
1393  res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1394 
1395  if (totempg_threaded_mode == 1) {
1396  pthread_mutex_unlock (&totempg_mutex);
1397  }
1398  return (res);
1399 }
1400 
1401 /*
1402  * Returns -1 if error, 0 if can't send, 1 if can send the message
1403  */
1405  void *totempg_groups_instance,
1406  const struct totempg_group *groups,
1407  size_t groups_cnt,
1408  const struct iovec *iovec,
1409  unsigned int iov_len)
1410 {
1411  unsigned int size = 0;
1412  unsigned int i;
1413  unsigned int res;
1414 
1415  if (totempg_threaded_mode == 1) {
1416  pthread_mutex_lock (&totempg_mutex);
1417  }
1418 
1419  for (i = 0; i < groups_cnt; i++) {
1420  size += groups[i].group_len;
1421  }
1422  for (i = 0; i < iov_len; i++) {
1423  size += iovec[i].iov_len;
1424  }
1425 
1426  res = msg_count_send_ok (size);
1427 
1428  if (totempg_threaded_mode == 1) {
1429  pthread_mutex_unlock (&totempg_mutex);
1430  }
1431  return (res);
1432 }
1433 
1435  struct totem_ip_address *interface_addr,
1436  unsigned short ip_port,
1437  unsigned int iface_no)
1438 {
1439  int res;
1440 
1441  res = totemsrp_iface_set (
1442  totemsrp_context,
1443  interface_addr,
1444  ip_port,
1445  iface_no);
1446 
1447  return (res);
1448 }
1449 
1451  unsigned int nodeid,
1452  unsigned int *interface_id,
1453  struct totem_ip_address *interfaces,
1454  unsigned int interfaces_size,
1455  char ***status,
1456  unsigned int *iface_count)
1457 {
1458  int res;
1459 
1460  res = totemsrp_ifaces_get (
1461  totemsrp_context,
1462  nodeid,
1463  interface_id,
1464  interfaces,
1465  interfaces_size,
1466  status,
1467  iface_count);
1468 
1469  return (res);
1470 }
1471 
1473 {
1474  totemsrp_event_signal (totemsrp_context, type, value);
1475 }
1476 
1477 void* totempg_get_stats (void)
1478 {
1479  return &totempg_stats;
1480 }
1481 
1483  const char *cipher_type,
1484  const char *hash_type)
1485 {
1486  int res;
1487 
1488  res = totemsrp_crypto_set (totemsrp_context, cipher_type, hash_type);
1489 
1490  return (res);
1491 }
1492 
1493 #define ONE_IFACE_LEN 63
1494 const char *totempg_ifaces_print (unsigned int nodeid)
1495 {
1496  static char iface_string[256 * INTERFACE_MAX];
1497  char one_iface[ONE_IFACE_LEN+1];
1498  struct totem_ip_address interfaces[INTERFACE_MAX];
1499  unsigned int iface_count;
1500  unsigned int iface_ids[INTERFACE_MAX];
1501  unsigned int i;
1502  int res;
1503 
1504  iface_string[0] = '\0';
1505 
1506  res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1507  if (res == -1) {
1508  return ("no interface found for nodeid");
1509  }
1510 
1511  res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1512 
1513  for (i = 0; i < iface_count; i++) {
1514  if (!interfaces[i].family) {
1515  continue;
1516  }
1517  snprintf (one_iface, ONE_IFACE_LEN,
1518  "r(%d) ip(%s) ",
1519  i, totemip_print (&interfaces[i]));
1520  strcat (iface_string, one_iface);
1521  }
1522  return (iface_string);
1523 }
1524 
1525 unsigned int totempg_my_nodeid_get (void)
1526 {
1527  return (totemsrp_my_nodeid_get(totemsrp_context));
1528 }
1529 
1531 {
1532  return (totemsrp_my_family_get(totemsrp_context));
1533 }
1535  void (*totem_service_ready) (void))
1536 {
1537  totemsrp_service_ready_register (totemsrp_context, totem_service_ready);
1538 }
1539 
1541 {
1542  totem_queue_level_changed = fn;
1543 }
1544 
1546  const struct totem_ip_address *member,
1547  int ring_no)
1548 {
1549  return totemsrp_member_add (totemsrp_context, member, ring_no);
1550 }
1551 
1553  const struct totem_ip_address *member,
1554  int ring_no)
1555 {
1556  return totemsrp_member_remove (totemsrp_context, member, ring_no);
1557 }
1558 
1559 extern int totempg_reconfigure (void)
1560 {
1561  return totemsrp_reconfigure (totemsrp_context, totempg_totem_config);
1562 }
1563 
1564 extern void totempg_stats_clear (int flags)
1565 {
1567  totempg_stats.msg_reserved = 0;
1568  totempg_stats.msg_queue_avail = 0;
1569  }
1570  return totemsrp_stats_clear (totemsrp_context, flags);
1571 }
1572 
1574 {
1575  totempg_threaded_mode = 1;
1576  totemsrp_threaded_mode_enable (totemsrp_context);
1577 }
1578 
1580 {
1581  totemsrp_trans_ack (totemsrp_context);
1582 }
1583 
1585 {
1586  totemsrp_force_gather(totemsrp_context);
1587 }
totempg_callback_token_create
int totempg_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totempg.c:1099
totem_logging_configuration::log_level_error
int log_level_error
Definition: totem.h:109
totempg_trans_ack
void totempg_trans_ack(void)
Definition: totempg.c:1579
totem_callback_token_type
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
totemsrp_crypto_set
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1085
totem_message_header::type
char type
Definition: totem.h:127
value
uint32_t value
Definition: exec/votequorum.c:101
totempg_initialize
int totempg_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config)
Initialize the totem process groups abstraction.
Definition: totempg.c:802
totem_configuration_type
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
totempg_stats_clear
void totempg_stats_clear(int flags)
Definition: totempg.c:1564
totempg_mcast
Definition: totempg.c:137
THROW_AWAY_INACTIVE
@ THROW_AWAY_INACTIVE
Definition: totempg.c:195
totempg_member_add
int totempg_member_add(const struct totem_ip_address *member, int ring_no)
Definition: totempg.c:1545
totempg.h
assembly::index
int index
Definition: totempg.c:202
totempg_stats_t::msg_queue_avail
uint32_t msg_queue_avail
Definition: totemstats.h:98
THROW_AWAY_ACTIVE
@ THROW_AWAY_ACTIVE
Definition: totempg.c:196
totempg_mcast::msg_count
unsigned short msg_count
Definition: totempg.c:141
TOTEMPG_PACKET_SIZE
#define TOTEMPG_PACKET_SIZE
Definition: totempg.c:153
TOTEM_Q_LEVEL_LOW
@ TOTEM_Q_LEVEL_LOW
Definition: totempg.h:178
totempg_groups_initialize
int totempg_groups_initialize(void **totempg_groups_instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
Definition: totempg.c:1134
totempg_group_instance::list
struct qb_list_head list
Definition: totempg.c:261
totempg_mcast_header::version
short version
Definition: totempg.c:115
totemsrp_finalize
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1024
MAX_IOVECS_FROM_APP
#define MAX_IOVECS_FROM_APP
Definition: totempg.c:1229
totem_logging_configuration::log_level_debug
int log_level_debug
Definition: totem.h:112
totemip_print
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:264
type
char type
Definition: totem.h:4
totemsrp.h
totem_logging_configuration::log_level_notice
int log_level_notice
Definition: totem.h:111
totempg_queue_level_register_callback
void totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)
Definition: totempg.c:1540
totemsrp_service_ready_register
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:5135
log_printf
#define log_printf(level, format, args...)
Definition: totempg.c:272
TOTEMPG_NEED_ALIGN
#define TOTEMPG_NEED_ALIGN
Definition: totempg.c:123
totempg_group::group_len
size_t group_len
Definition: totempg.h:57
totemsrp_callback_token_create
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3454
mcast
Definition: totemsrp.c:180
totemsrp_my_family_get
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1110
MAX_GROUPS_PER_MSG
#define MAX_GROUPS_PER_MSG
Definition: totempg.c:1230
TOTEM_CALLBACK_TOKEN_RECEIVED
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition: coroapi.h:143
totemsrp_reconfigure
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition: totemsrp.c:5186
totemsrp_net_mtu_adjust
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5131
totempg_stats_t
Definition: totemstats.h:94
totempg_mcast::fragmented
unsigned char fragmented
Definition: totempg.c:139
CS_PRI_NODE_ID
#define CS_PRI_NODE_ID
Definition: corotypes.h:59
totempg_reconfigure
int totempg_reconfigure(void)
Definition: totempg.c:1559
FRAME_SIZE_MAX
#define FRAME_SIZE_MAX
Definition: totem.h:52
totempg_groups_join
int totempg_groups_join(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition: totempg.c:1182
swab.h
totem_queue_level_changed_fn
void(* totem_queue_level_changed_fn)(enum totem_q_level level)
Definition: totempg.h:186
mcast::header
struct totem_message_header header
Definition: totemsrp.c:181
totemsrp_stats_clear
void totemsrp_stats_clear(void *context, int flags)
Definition: totemsrp.c:5195
totempg_group_instance::q_level
int32_t q_level
Definition: totempg.c:259
totemsrp_iface_set
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemsrp.c:5055
callback_token_received_handle
void * callback_token_received_handle
Definition: totempg.c:746
totempg_groups_mcast_joined
int totempg_groups_mcast_joined(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
Definition: totempg.c:1232
header
struct totem_message_header header
Definition: totemsrp.c:2
totempg_group_instance::groups
struct totempg_group * groups
Definition: totempg.c:256
totemsrp_event_signal
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2447
assembly::last_frag_num
unsigned char last_frag_num
Definition: totempg.c:203
totem_config::net_mtu
unsigned int net_mtu
Definition: totem.h:202
totempg_groups_leave
int totempg_groups_leave(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition: totempg.c:1214
totempg_threaded_mode_enable
void totempg_threaded_mode_enable(void)
Definition: totempg.c:1573
INTERFACE_MAX
#define INTERFACE_MAX
Definition: coroapi.h:88
assembly::data
unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE]
Definition: totempg.c:201
totempg_mcast_header::type
short type
Definition: totempg.c:116
totempg_my_family_get
int totempg_my_family_get(void)
Definition: totempg.c:1530
totempg_event_signal
void totempg_event_signal(enum totem_event_type type, int value)
Definition: totempg.c:1472
totempg_groups_send_ok_groups
int totempg_groups_send_ok_groups(void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1404
TOTEM_Q_LEVEL_CRITICAL
@ TOTEM_Q_LEVEL_CRITICAL
Definition: totempg.h:181
__attribute__
typedef __attribute__
totempg_groups_mcast_groups
int totempg_groups_mcast_groups(void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1360
totemsrp_trans_ack
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:5177
totem_logging_configuration::log_subsys_id
int log_subsys_id
Definition: totem.h:114
totempg_group_instance
Definition: totempg.c:242
totempg_member_remove
int totempg_member_remove(const struct totem_ip_address *member, int ring_no)
Definition: totempg.c:1552
totemsrp_force_gather
void totemsrp_force_gather(void *context)
Definition: totemsrp.c:5205
totempg_callback_token_destroy
void totempg_callback_token_destroy(void *handle_out)
Definition: totempg.c:1118
totempg_groups_joined_release
int totempg_groups_joined_release(int msg_count)
Definition: totempg.c:1346
totempg_iface_set
int totempg_iface_set(struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totempg.c:1434
totempg_ifaces_print
const char * totempg_ifaces_print(unsigned int nodeid)
Definition: totempg.c:1494
swab16
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
flags
uint32_t flags
Definition: exec/votequorum.c:103
totem_config
Definition: totem.h:152
totempg_ifaces_get
int totempg_ifaces_get(unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totempg.c:1450
totempg_group
Definition: totempg.h:55
throw_away_mode
throw_away_mode
Definition: totempg.c:194
family
unsigned short family
Definition: coroapi.h:76
totem_ip_address
The totem_ip_address struct.
Definition: coroapi.h:111
totempg_mcast_header
Definition: totempg.c:114
assembly::list
struct qb_list_head list
Definition: totempg.c:205
assembly::nodeid
unsigned int nodeid
Definition: totempg.c:200
memb_ring_id
The memb_ring_id struct.
Definition: coroapi.h:122
totempg_force_gather
void totempg_force_gather(void)
Definition: totempg.c:1584
totempg_crypto_set
int totempg_crypto_set(const char *cipher_type, const char *hash_type)
Definition: totempg.c:1482
totempg_mcast::header
struct totempg_mcast_header header
Definition: totempg.c:138
totempg_check_q_level
void totempg_check_q_level(void *totempg_groups_instance)
Definition: totempg.c:1294
totem_logging_configuration::log_level_warning
int log_level_warning
Definition: totem.h:110
totem_logging_configuration::log_level_security
void(*) in log_level_security)
Definition: totem.h:106
totem_event_type
totem_event_type
Definition: totem.h:259
totempg_totem_config
static void(*) struct totem_config totempg_totem_config)
Definition: totempg.c:190
util.h
assembly
Definition: totempg.c:199
totempg_group_instance::groups_cnt
int groups_cnt
Definition: totempg.c:258
totempg_my_nodeid_get
unsigned int totempg_my_nodeid_get(void)
Definition: totempg.c:1525
totem_message_header::version
char version
Definition: totem.h:126
nodeid
unsigned int nodeid
Definition: coroapi.h:75
MESSAGE_SIZE_MAX
#define MESSAGE_SIZE_MAX
Definition: coroapi.h:97
totempg_group_instance::confchg_fn
void(* confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totempg.c:249
totemsrp_member_add
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5144
ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:6
totemsrp_avail
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2527
totempg_groups_joined_reserve
int totempg_groups_joined_reserve(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1302
assembly::throw_away_mode
enum throw_away_mode throw_away_mode
Definition: totempg.c:204
config.h
totempg_service_ready_register
void totempg_service_ready_register(void(*totem_service_ready)(void))
Definition: totempg.c:1534
totempg_group::group
const void * group
Definition: totempg.h:56
MESSAGE_QUEUE_MAX
#define MESSAGE_QUEUE_MAX
Definition: totempg.c:1091
logsys.h
TOTEM_Q_LEVEL_HIGH
@ TOTEM_Q_LEVEL_HIGH
Definition: totempg.h:180
totemsrp_initialize
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:816
TOTEM_EVENT_NEW_MSG
@ TOTEM_EVENT_NEW_MSG
Definition: totem.h:261
totem_logging_configuration::log_printf
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:99
ONE_IFACE_LEN
#define ONE_IFACE_LEN
Definition: totempg.c:1493
totempg_group_instance::deliver_fn
void(* deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totempg.c:243
totempg_stats_t::msg_reserved
uint32_t msg_reserved
Definition: totemstats.h:97
QB_LIST_DECLARE
QB_LIST_DECLARE(assembly_list_inuse)
totemsrp_my_nodeid_get
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1099
totemsrp_ifaces_get
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1047
totemsrp_member_remove
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5157
min
#define min(a, b)
Definition: exec/util.h:66
guarantee
int guarantee
Definition: totemsrp.c:8
totemsrp_threaded_mode_enable
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:5170
totemsrp_mcast
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2456
totempg_get_stats
void * totempg_get_stats(void)
Definition: totempg.c:1477
totempg_finalize
void totempg_finalize(void)
Definition: totempg.c:855
totempg_mcast::continuation
unsigned char continuation
Definition: totempg.c:140
totemsrp_callback_token_destroy
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3489
TOTEMPG_STATS_CLEAR_TOTEM
#define TOTEMPG_STATS_CLEAR_TOTEM
Definition: totemstats.h:115
TOTEM_Q_LEVEL_GOOD
@ TOTEM_Q_LEVEL_GOOD
Definition: totempg.h:179
totem_config::totem_logging_configuration
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:200