pacemaker  1.1.17-b36b869ca8
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008 Andrew Beekhof
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  *
18  */
19 #include <crm_internal.h>
20 #include <crm/crm.h>
21 
22 #include <sys/param.h>
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 #include <netinet/ip.h>
31 #include <netinet/tcp.h>
32 #include <netdb.h>
33 
34 #include <stdlib.h>
35 #include <errno.h>
36 #include <fcntl.h>
37 #include <glib.h>
38 
39 #include <bzlib.h>
40 
41 #include <crm/common/ipcs.h>
42 #include <crm/common/xml.h>
43 #include <crm/common/mainloop.h>
44 
45 #ifdef HAVE_GNUTLS_GNUTLS_H
46 # undef KEYFILE
47 # include <gnutls/gnutls.h>
48 
49 const int psk_tls_kx_order[] = {
50  GNUTLS_KX_DHE_PSK,
51  GNUTLS_KX_PSK,
52 };
53 
54 const int anon_tls_kx_order[] = {
55  GNUTLS_KX_ANON_DH,
56  GNUTLS_KX_DHE_RSA,
57  GNUTLS_KX_DHE_DSS,
58  GNUTLS_KX_RSA,
59  0
60 };
61 #endif
62 
63 /* Swab macros from linux/swab.h */
64 #ifdef HAVE_LINUX_SWAB_H
65 # include <linux/swab.h>
66 #else
67 /*
68  * casts are necessary for constants, because we never know how for sure
69  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
70  */
71 #define __swab16(x) ((uint16_t)( \
72  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
73  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
74 
75 #define __swab32(x) ((uint32_t)( \
76  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
77  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
78  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
79  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
80 
81 #define __swab64(x) ((uint64_t)( \
82  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
83  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
84  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
85  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
86  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
87  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
88  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
89  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
90 #endif
91 
92 #define REMOTE_MSG_VERSION 1
93 #define ENDIAN_LOCAL 0xBADADBBD
94 
95 struct crm_remote_header_v0
96 {
97  uint32_t endian; /* Detect messages from hosts with different endian-ness */
99  uint64_t id;
100  uint64_t flags;
105 
106  /* New fields get added here */
107 
108 } __attribute__ ((packed));
109 
110 static struct crm_remote_header_v0 *
111 crm_remote_header(crm_remote_t * remote)
112 {
113  struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
114  if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
115  return NULL;
116 
117  } else if(header->endian != ENDIAN_LOCAL) {
118  uint32_t endian = __swab32(header->endian);
119 
120  CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
121  if(endian != ENDIAN_LOCAL) {
122  crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
123  ENDIAN_LOCAL, header->endian, endian);
124  return NULL;
125  }
126 
127  header->id = __swab64(header->id);
128  header->flags = __swab64(header->flags);
129  header->endian = __swab32(header->endian);
130 
131  header->version = __swab32(header->version);
132  header->size_total = __swab32(header->size_total);
133  header->payload_offset = __swab32(header->payload_offset);
134  header->payload_compressed = __swab32(header->payload_compressed);
135  header->payload_uncompressed = __swab32(header->payload_uncompressed);
136  }
137 
138  return header;
139 }
140 
141 #ifdef HAVE_GNUTLS_GNUTLS_H
142 
143 int
144 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
145 {
146  int rc = 0;
147  int pollrc = 0;
148  time_t start = time(NULL);
149 
150  do {
151  rc = gnutls_handshake(*remote->tls_session);
152  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
153  pollrc = crm_remote_ready(remote, 1000);
154  if (pollrc < 0) {
155  /* poll returned error, there is no hope */
156  rc = -1;
157  }
158  }
160  } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
161  (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
162 
163  if (rc < 0) {
164  crm_trace("gnutls_handshake() failed with %d", rc);
165  }
166  return rc;
167 }
168 
169 void *
170 crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ,
171  void *credentials)
172 {
173  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
174 
175  gnutls_init(session, type);
176 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
177 /* http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication */
178  gnutls_priority_set_direct(*session, "NORMAL:+ANON-DH", NULL);
179 /* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
180 # else
181  gnutls_set_default_priority(*session);
182  gnutls_kx_set_priority(*session, anon_tls_kx_order);
183 # endif
184  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
185  switch (type) {
186  case GNUTLS_SERVER:
187  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
188  (gnutls_anon_server_credentials_t) credentials);
189  break;
190  case GNUTLS_CLIENT:
191  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
192  (gnutls_anon_client_credentials_t) credentials);
193  break;
194  }
195 
196  return session;
197 }
198 
199 void *
200 create_psk_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ , void *credentials)
201 {
202  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
203 
204  gnutls_init(session, type);
205 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
206  gnutls_priority_set_direct(*session, "NORMAL:+DHE-PSK:+PSK", NULL);
207 # else
208  gnutls_set_default_priority(*session);
209  gnutls_kx_set_priority(*session, psk_tls_kx_order);
210 # endif
211  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
212  switch (type) {
213  case GNUTLS_SERVER:
214  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
215  (gnutls_psk_server_credentials_t) credentials);
216  break;
217  case GNUTLS_CLIENT:
218  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
219  (gnutls_psk_client_credentials_t) credentials);
220  break;
221  }
222 
223  return session;
224 }
225 
226 static int
227 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
228 {
229  const char *unsent = buf;
230  int rc = 0;
231  int total_send;
232 
233  if (buf == NULL) {
234  return -1;
235  }
236 
237  total_send = len;
238  crm_trace("Message size: %llu", (unsigned long long) len);
239 
240  while (TRUE) {
241  rc = gnutls_record_send(*session, unsent, len);
242 
243  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
244  crm_debug("Retry");
245 
246  } else if (rc < 0) {
247  crm_err("Connection terminated rc = %d", rc);
248  break;
249 
250  } else if (rc < len) {
251  crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
252  len -= rc;
253  unsent += rc;
254  } else {
255  crm_trace("Sent all %d bytes", rc);
256  break;
257  }
258  }
259 
260  return rc < 0 ? rc : total_send;
261 }
262 #endif
263 
264 static int
265 crm_send_plaintext(int sock, const char *buf, size_t len)
266 {
267 
268  int rc = 0;
269  const char *unsent = buf;
270  int total_send;
271 
272  if (buf == NULL) {
273  return -1;
274  }
275  total_send = len;
276 
277  crm_trace("Message on socket %d: size=%llu",
278  sock, (unsigned long long) len);
279  retry:
280  rc = write(sock, unsent, len);
281  if (rc < 0) {
282  switch (errno) {
283  case EINTR:
284  case EAGAIN:
285  crm_trace("Retry");
286  goto retry;
287  default:
288  crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
289  break;
290  }
291 
292  } else if (rc < len) {
293  crm_trace("Only sent %d of %llu remaining bytes",
294  rc, (unsigned long long) len);
295  len -= rc;
296  unsent += rc;
297  goto retry;
298 
299  } else {
300  crm_trace("Sent %d bytes: %.100s", rc, buf);
301  }
302 
303  return rc < 0 ? rc : total_send;
304 
305 }
306 
307 static int
308 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
309 {
310  int lpc = 0;
311  int rc = -ESOCKTNOSUPPORT;
312 
313  for(; lpc < iovs; lpc++) {
314 
315 #ifdef HAVE_GNUTLS_GNUTLS_H
316  if (remote->tls_session) {
317  rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
318  } else if (remote->tcp_socket) {
319 #else
320  if (remote->tcp_socket) {
321 #endif
322  rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
323 
324  } else {
325  crm_err("Unsupported connection type");
326  }
327  }
328  return rc;
329 }
330 
331 int
332 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
333 {
334  int rc = -1;
335  static uint64_t id = 0;
336  char *xml_text = dump_xml_unformatted(msg);
337 
338  struct iovec iov[2];
339  struct crm_remote_header_v0 *header;
340 
341  if (xml_text == NULL) {
342  crm_err("Invalid XML, can not send msg");
343  return -1;
344  }
345 
346  header = calloc(1, sizeof(struct crm_remote_header_v0));
347  iov[0].iov_base = header;
348  iov[0].iov_len = sizeof(struct crm_remote_header_v0);
349 
350  iov[1].iov_base = xml_text;
351  iov[1].iov_len = 1 + strlen(xml_text);
352 
353  id++;
354  header->id = id;
355  header->endian = ENDIAN_LOCAL;
356  header->version = REMOTE_MSG_VERSION;
357  header->payload_offset = iov[0].iov_len;
358  header->payload_uncompressed = iov[1].iov_len;
359  header->size_total = iov[0].iov_len + iov[1].iov_len;
360 
361  crm_trace("Sending len[0]=%d, start=%x",
362  (int)iov[0].iov_len, *(int*)(void*)xml_text);
363  rc = crm_remote_sendv(remote, iov, 2);
364  if (rc < 0) {
365  crm_err("Failed to send remote msg, rc = %d", rc);
366  }
367 
368  free(iov[0].iov_base);
369  free(iov[1].iov_base);
370  return rc;
371 }
372 
373 
379 xmlNode *
381 {
382  xmlNode *xml = NULL;
383  struct crm_remote_header_v0 *header = crm_remote_header(remote);
384 
385  if (remote->buffer == NULL || header == NULL) {
386  return NULL;
387  }
388 
389  /* Support compression on the receiving end now, in case we ever want to add it later */
390  if (header->payload_compressed) {
391  int rc = 0;
392  unsigned int size_u = 1 + header->payload_uncompressed;
393  char *uncompressed = calloc(1, header->payload_offset + size_u);
394 
395  crm_trace("Decompressing message data %d bytes into %d bytes",
396  header->payload_compressed, size_u);
397 
398  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
399  remote->buffer + header->payload_offset,
400  header->payload_compressed, 1, 0);
401 
402  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
403  crm_warn("Couldn't decompress v%d message, we only understand v%d",
404  header->version, REMOTE_MSG_VERSION);
405  free(uncompressed);
406  return NULL;
407 
408  } else if (rc != BZ_OK) {
409  crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
410  free(uncompressed);
411  return NULL;
412  }
413 
414  CRM_ASSERT(size_u == header->payload_uncompressed);
415 
416  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
417  remote->buffer_size = header->payload_offset + size_u;
418 
419  free(remote->buffer);
420  remote->buffer = uncompressed;
421  header = crm_remote_header(remote);
422  }
423 
424  /* take ownership of the buffer */
425  remote->buffer_offset = 0;
426 
427  CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
428 
429  xml = string2xml(remote->buffer + header->payload_offset);
430  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
431  crm_warn("Couldn't parse v%d message, we only understand v%d",
432  header->version, REMOTE_MSG_VERSION);
433 
434  } else if (xml == NULL) {
435  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
436  }
437 
438  return xml;
439 }
440 
450 int
451 crm_remote_ready(crm_remote_t *remote, int total_timeout)
452 {
453  struct pollfd fds = { 0, };
454  int sock = 0;
455  int rc = 0;
456  time_t start;
457  int timeout = total_timeout;
458 
459 #ifdef HAVE_GNUTLS_GNUTLS_H
460  if (remote->tls_session) {
461  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
462 
463  sock = GPOINTER_TO_INT(sock_ptr);
464  } else if (remote->tcp_socket) {
465 #else
466  if (remote->tcp_socket) {
467 #endif
468  sock = remote->tcp_socket;
469  } else {
470  crm_err("Unsupported connection type");
471  }
472 
473  if (sock <= 0) {
474  crm_trace("No longer connected");
475  return -ENOTCONN;
476  }
477 
478  start = time(NULL);
479  errno = 0;
480  do {
481  fds.fd = sock;
482  fds.events = POLLIN;
483 
484  /* If we got an EINTR while polling, and we have a
485  * specific timeout we are trying to honor, attempt
486  * to adjust the timeout to the closest second. */
487  if (errno == EINTR && (timeout > 0)) {
488  timeout = total_timeout - ((time(NULL) - start) * 1000);
489  if (timeout < 1000) {
490  timeout = 1000;
491  }
492  }
493 
494  rc = poll(&fds, 1, timeout);
495  } while (rc < 0 && errno == EINTR);
496 
497  return (rc < 0)? -errno : rc;
498 }
499 
500 
511 static size_t
512 crm_remote_recv_once(crm_remote_t * remote)
513 {
514  int rc = 0;
515  size_t read_len = sizeof(struct crm_remote_header_v0);
516  struct crm_remote_header_v0 *header = crm_remote_header(remote);
517 
518  if(header) {
519  /* Stop at the end of the current message */
520  read_len = header->size_total;
521  }
522 
523  /* automatically grow the buffer when needed */
524  if(remote->buffer_size < read_len) {
525  remote->buffer_size = 2 * read_len;
526  crm_trace("Expanding buffer to %llu bytes",
527  (unsigned long long) remote->buffer_size);
528 
529  remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
530  CRM_ASSERT(remote->buffer != NULL);
531  }
532 
533 #ifdef HAVE_GNUTLS_GNUTLS_H
534  if (remote->tls_session) {
535  rc = gnutls_record_recv(*(remote->tls_session),
536  remote->buffer + remote->buffer_offset,
537  remote->buffer_size - remote->buffer_offset);
538  if (rc == GNUTLS_E_INTERRUPTED) {
539  rc = -EINTR;
540  } else if (rc == GNUTLS_E_AGAIN) {
541  rc = -EAGAIN;
542  } else if (rc < 0) {
543  crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
544  rc = -pcmk_err_generic;
545  }
546  } else if (remote->tcp_socket) {
547 #else
548  if (remote->tcp_socket) {
549 #endif
550  errno = 0;
551  rc = read(remote->tcp_socket,
552  remote->buffer + remote->buffer_offset,
553  remote->buffer_size - remote->buffer_offset);
554  if(rc < 0) {
555  rc = -errno;
556  }
557 
558  } else {
559  crm_err("Unsupported connection type");
560  return -ESOCKTNOSUPPORT;
561  }
562 
563  /* process any errors. */
564  if (rc > 0) {
565  remote->buffer_offset += rc;
566  /* always null terminate buffer, the +1 to alloc always allows for this. */
567  remote->buffer[remote->buffer_offset] = '\0';
568  crm_trace("Received %u more bytes, %llu total",
569  rc, (unsigned long long) remote->buffer_offset);
570 
571  } else if (rc == -EINTR || rc == -EAGAIN) {
572  crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
573 
574  } else if (rc == 0) {
575  crm_debug("EOF encoutered after %llu bytes",
576  (unsigned long long) remote->buffer_offset);
577  return -ENOTCONN;
578 
579  } else {
580  crm_debug("Error receiving message after %llu bytes: %s (%d)",
581  (unsigned long long) remote->buffer_offset,
582  pcmk_strerror(rc), rc);
583  return -ENOTCONN;
584  }
585 
586  header = crm_remote_header(remote);
587  if(header) {
588  if(remote->buffer_offset < header->size_total) {
589  crm_trace("Read less than the advertised length: %llu < %u bytes",
590  (unsigned long long) remote->buffer_offset,
591  header->size_total);
592  } else {
593  crm_trace("Read full message of %llu bytes",
594  (unsigned long long) remote->buffer_offset);
595  return remote->buffer_offset;
596  }
597  }
598 
599  return -EAGAIN;
600 }
601 
612 gboolean
613 crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
614 {
615  int rc;
616  time_t start = time(NULL);
617  int remaining_timeout = 0;
618 
619  if (total_timeout == 0) {
620  total_timeout = 10000;
621  } else if (total_timeout < 0) {
622  total_timeout = 60000;
623  }
624  *disconnected = 0;
625 
626  remaining_timeout = total_timeout;
627  while ((remaining_timeout > 0) && !(*disconnected)) {
628 
629  crm_trace("Waiting for remote data (%d of %d ms timeout remaining)",
630  remaining_timeout, total_timeout);
631  rc = crm_remote_ready(remote, remaining_timeout);
632 
633  if (rc == 0) {
634  crm_err("Timed out (%d ms) while waiting for remote data",
635  remaining_timeout);
636  return FALSE;
637 
638  } else if (rc < 0) {
639  crm_debug("Wait for remote data aborted, will try again: %s "
640  CRM_XS " rc=%d", pcmk_strerror(rc), rc);
641 
642  } else {
643  rc = crm_remote_recv_once(remote);
644  if (rc > 0) {
645  return TRUE;
646  } else if (rc == -EAGAIN) {
647  crm_trace("Still waiting for remote data");
648  } else if (rc < 0) {
649  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
650  pcmk_strerror(rc), rc);
651  }
652  }
653 
654  if (rc == -ENOTCONN) {
655  *disconnected = 1;
656  return FALSE;
657  }
658 
659  remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
660  }
661 
662  return FALSE;
663 }
664 
665 struct tcp_async_cb_data {
666  gboolean success;
667  int sock;
668  void *userdata;
669  void (*callback) (void *userdata, int sock);
670  int timeout; /*ms */
671  time_t start;
672 };
673 
674 static gboolean
675 check_connect_finished(gpointer userdata)
676 {
677  struct tcp_async_cb_data *cb_data = userdata;
678  int rc = 0;
679  int sock = cb_data->sock;
680  int error = 0;
681 
682  fd_set rset, wset;
683  socklen_t len = sizeof(error);
684  struct timeval ts = { 0, };
685 
686  if (cb_data->success == TRUE) {
687  goto dispatch_done;
688  }
689 
690  FD_ZERO(&rset);
691  FD_SET(sock, &rset);
692  wset = rset;
693 
694  crm_trace("fd %d: checking to see if connect finished", sock);
695  rc = select(sock + 1, &rset, &wset, NULL, &ts);
696 
697  if (rc < 0) {
698  rc = errno;
699  if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
700  /* reschedule if there is still time left */
701  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
702  goto reschedule;
703  } else {
704  rc = -ETIMEDOUT;
705  }
706  }
707  crm_trace("fd %d: select failed %d connect dispatch ", sock, rc);
708  goto dispatch_done;
709  } else if (rc == 0) {
710  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
711  goto reschedule;
712  }
713  crm_debug("fd %d: timeout during select", sock);
714  rc = -ETIMEDOUT;
715  goto dispatch_done;
716  } else {
717  crm_trace("fd %d: select returned success", sock);
718  rc = 0;
719  }
720 
721  /* can we read or write to the socket now? */
722  if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
723  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
724  crm_trace("fd %d: call to getsockopt failed", sock);
725  rc = -1;
726  goto dispatch_done;
727  }
728 
729  if (error) {
730  crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
731  rc = -1;
732  goto dispatch_done;
733  }
734  } else {
735  crm_trace("neither read nor write set after select");
736  rc = -1;
737  goto dispatch_done;
738  }
739 
740  dispatch_done:
741  if (!rc) {
742  crm_trace("fd %d: connected", sock);
743  /* Success, set the return code to the sock to report to the callback */
744  rc = cb_data->sock;
745  cb_data->sock = 0;
746  } else {
747  close(sock);
748  }
749 
750  if (cb_data->callback) {
751  cb_data->callback(cb_data->userdata, rc);
752  }
753  free(cb_data);
754  return FALSE;
755 
756  reschedule:
757 
758  /* will check again next interval */
759  return TRUE;
760 }
761 
762 static int
763 internal_tcp_connect_async(int sock,
764  const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
765  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
766 {
767  int rc = 0;
768  int flag = 0;
769  int interval = 500;
770  int timer;
771  struct tcp_async_cb_data *cb_data = NULL;
772 
773  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
774  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
775  crm_err("fcntl() write failed");
776  return -1;
777  }
778  }
779 
780  rc = connect(sock, addr, addrlen);
781 
782  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
783  return -1;
784  }
785 
786  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
787  cb_data->userdata = userdata;
788  cb_data->callback = callback;
789  cb_data->sock = sock;
790  cb_data->timeout = timeout;
791  cb_data->start = time(NULL);
792 
793  if (rc == 0) {
794  /* The connect was successful immediately, we still return to mainloop
795  * and let this callback get called later. This avoids the user of this api
796  * to have to account for the fact the callback could be invoked within this
797  * function before returning. */
798  cb_data->success = TRUE;
799  interval = 1;
800  }
801 
802  /* Check connect finished is mostly doing a non-block poll on the socket
803  * to see if we can read/write to it. Once we can, the connect has completed.
804  * This method allows us to connect to the server without blocking mainloop.
805  *
806  * This is a poor man's way of polling to see when the connection finished.
807  * At some point we should figure out a way to use a mainloop fd callback for this.
808  * Something about the way mainloop is currently polling prevents this from working at the
809  * moment though. */
810  crm_trace("fd %d: scheduling to check if connect finished in %dms second", sock, interval);
811  timer = g_timeout_add(interval, check_connect_finished, cb_data);
812  if (timer_id) {
813  *timer_id = timer;
814  }
815 
816  return 0;
817 }
818 
819 static int
820 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
821 {
822  int flag = 0;
823  int rc = connect(sock, addr, addrlen);
824 
825  if (rc == 0) {
826  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
827  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
828  crm_err("fcntl() write failed");
829  return -1;
830  }
831  }
832  }
833 
834  return rc;
835 }
836 
843 int
844 crm_remote_tcp_connect_async(const char *host, int port, int timeout, /*ms */
845  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
846 {
847  char buffer[INET6_ADDRSTRLEN];
848  struct addrinfo *res = NULL;
849  struct addrinfo *rp = NULL;
850  struct addrinfo hints;
851  const char *server = host;
852  int ret_ga;
853  int sock = -1;
854 
855  /* getaddrinfo */
856  memset(&hints, 0, sizeof(struct addrinfo));
857  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
858  hints.ai_socktype = SOCK_STREAM;
859  hints.ai_flags = AI_CANONNAME;
860 
861  crm_debug("Looking up %s", server);
862  ret_ga = getaddrinfo(server, NULL, &hints, &res);
863  if (ret_ga) {
864  crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
865  return -1;
866  }
867 
868  if (!res || !res->ai_addr) {
869  crm_err("getaddrinfo failed");
870  goto async_cleanup;
871  }
872 
873  for (rp = res; rp != NULL; rp = rp->ai_next) {
874  struct sockaddr *addr = rp->ai_addr;
875 
876  if (!addr) {
877  continue;
878  }
879 
880  if (rp->ai_canonname) {
881  server = res->ai_canonname;
882  }
883  crm_debug("Got address %s for %s", server, host);
884 
885  /* create socket */
886  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
887  if (sock == -1) {
888  crm_err("Socket creation failed for remote client connection.");
889  continue;
890  }
891 
892  /* Set port appropriately for address family */
893  /* (void*) casts avoid false-positive compiler alignment warnings */
894  if (addr->sa_family == AF_INET6) {
895  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
896  } else {
897  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
898  }
899 
900  memset(buffer, 0, DIMOF(buffer));
901  crm_sockaddr2str(addr, buffer);
902  crm_info("Attempting to connect to remote server at %s:%d", buffer, port);
903 
904  if (callback) {
905  if (internal_tcp_connect_async
906  (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
907  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
908  }
909 
910  } else {
911  if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
912  break; /* Success */
913  }
914  }
915 
916  close(sock);
917  sock = -1;
918  }
919 
920 async_cleanup:
921 
922  if (res) {
923  freeaddrinfo(res);
924  }
925  return sock;
926 }
927 
928 int
929 crm_remote_tcp_connect(const char *host, int port)
930 {
931  return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
932 }
933 
944 void
945 crm_sockaddr2str(void *sa, char *s)
946 {
947  switch (((struct sockaddr*)sa)->sa_family) {
948  case AF_INET:
949  inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
950  s, INET6_ADDRSTRLEN);
951  break;
952 
953  case AF_INET6:
954  inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
955  s, INET6_ADDRSTRLEN);
956  break;
957 
958  default:
959  strcpy(s, "<invalid>");
960  }
961 }
962 
963 int
965 {
966  int csock = 0;
967  int rc = 0;
968  int flag = 0;
969  unsigned laddr = 0;
970  struct sockaddr_storage addr;
971  char addr_str[INET6_ADDRSTRLEN];
972 #ifdef TCP_USER_TIMEOUT
973  int optval;
974  long sbd_timeout = crm_get_sbd_timeout();
975 #endif
976 
977  /* accept the connection */
978  laddr = sizeof(addr);
979  memset(&addr, 0, sizeof(addr));
980  csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
981  crm_sockaddr2str(&addr, addr_str);
982  crm_info("New remote connection from %s", addr_str);
983 
984  if (csock == -1) {
985  crm_err("accept socket failed");
986  return -1;
987  }
988 
989  if ((flag = fcntl(csock, F_GETFL)) >= 0) {
990  if ((rc = fcntl(csock, F_SETFL, flag | O_NONBLOCK)) < 0) {
991  crm_err("fcntl() write failed");
992  close(csock);
993  return rc;
994  }
995  } else {
996  crm_err("fcntl() read failed");
997  close(csock);
998  return flag;
999  }
1000 
1001 #ifdef TCP_USER_TIMEOUT
1002  if (sbd_timeout > 0) {
1003  optval = sbd_timeout / 2; /* time to fail and retry before watchdog */
1004  rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
1005  &optval, sizeof(optval));
1006  if (rc < 0) {
1007  crm_err("setting TCP_USER_TIMEOUT (%d) on client socket failed",
1008  optval);
1009  close(csock);
1010  return rc;
1011  }
1012  }
1013 #endif
1014 
1015  return csock;
1016 }
A dumping ground.
size_t buffer_offset
Definition: ipcs.h:46
long crm_get_sbd_timeout(void)
Definition: watchdog.c:246
uint32_t payload_compressed
Definition: remote.c:159
const char * pcmk_strerror(int rc)
Definition: logging.c:1132
char * buffer
Definition: ipcs.h:44
AIS_Host host
Definition: internal.h:52
uint32_t payload_uncompressed
Definition: remote.c:160
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:150
#define ENDIAN_LOCAL
Definition: remote.c:93
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:153
xmlNode * string2xml(const char *input)
Definition: xml.c:2774
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
Definition: remote.c:332
int crm_remote_tcp_connect(const char *host, int port)
Definition: remote.c:929
#define crm_warn(fmt, args...)
Definition: logging.h:249
#define crm_debug(fmt, args...)
Definition: logging.h:253
int crm_initiate_client_tls_handshake(crm_remote_t *remote, int timeout_ms)
void crm_sockaddr2str(void *sa, char *s)
Convert an IP address (IPv4 or IPv6) to a string for logging.
Definition: remote.c:945
void gnutls_session_t
Definition: cib_remote.c:52
int crm_remote_accept(int ssock)
Definition: remote.c:964
#define crm_trace(fmt, args...)
Definition: logging.h:254
uint64_t id
Definition: remote.c:155
int crm_remote_ready(crm_remote_t *remote, int total_timeout)
Definition: remote.c:451
#define __swab64(x)
Definition: remote.c:81
Wrappers for and extensions to libxml2.
#define pcmk_err_generic
Definition: error.h:45
uint32_t payload_offset
Definition: remote.c:158
struct tcp_async_cb_data __attribute__
uint32_t size_total
Definition: remote.c:157
#define CRM_XS
Definition: logging.h:42
#define __swab32(x)
Definition: remote.c:75
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:226
size_t buffer_size
Definition: ipcs.h:45
#define REMOTE_MSG_VERSION
Definition: remote.c:92
#define crm_err(fmt, args...)
Definition: logging.h:248
const char * bz2_strerror(int rc)
Definition: logging.c:1195
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3849
#define DIMOF(a)
Definition: crm.h:39
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
Definition: remote.c:380
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:35
void * create_psk_tls_session(int csock, int type, void *credentials)
void * crm_create_anon_tls_session(int sock, int type, void *credentials)
int tcp_socket
Definition: ipcs.h:48
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
Definition: remote.c:844
#define crm_info(fmt, args...)
Definition: logging.h:251
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
Definition: remote.c:613
uint32_t version
Definition: remote.c:154
uint64_t flags
Definition: remote.c:156
enum crm_ais_msg_types type
Definition: internal.h:51