ntopng.c 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. /*
  2. * Copyright 2018 Alastair D'Silva <alastair@d-silva.org> All rights reserved.
  3. *
  4. * Redistribution and use in source and binary forms, with or without
  5. * modification, are permitted provided that the following conditions
  6. * are met:
  7. * 1. Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * 2. Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. *
  13. * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  14. * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  15. * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  16. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  17. * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  18. * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  19. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  20. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  21. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  22. * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  23. */
  24. #include "common.h"
  25. #include "log.h"
  26. #include "treetype.h"
  27. #include "softflowd.h"
  28. #include <stdbool.h>
  29. struct NTOPNG_MSG_HEADER {
  30. char url[16];
  31. u_int8_t version, source_id;
  32. u_int16_t size;
  33. u_int32_t msg_id;
  34. } __attribute__((packed));
  35. /*
  36. * Connect to NTOPNG collector
  37. */
  38. int
  39. connect_ntopng(const char *host, const char *port, struct ZMQ *zmq) {
  40. void *context = zmq_ctx_new();
  41. void *pub_socket = zmq_socket(context, ZMQ_PUB);
  42. char connect_str[6 + NI_MAXHOST + 1 + NI_MAXSERV + 1]; /* "tcp://hostname:port" */
  43. if (!context)
  44. return errno;
  45. if (!pub_socket) {
  46. zmq_ctx_destroy(context);
  47. return errno;
  48. }
  49. snprintf(connect_str, sizeof(connect_str), "tcp://%s:%s", host, port);
  50. fprintf(stderr, "Connecting ZMQ socket '%s'\n", connect_str);
  51. if (zmq_connect (pub_socket, connect_str)) {
  52. zmq_close(pub_socket);
  53. zmq_ctx_destroy(context);
  54. return errno;
  55. }
  56. zmq->context = context;
  57. zmq->socket = pub_socket;
  58. return 0;
  59. }
  60. static int
  61. add_json_flow (struct SENDPARAMETER *sp, struct FLOW *flow, char *buf, size_t len)
  62. {
  63. int size = snprintf(buf, len,
  64. "{"
  65. "\"7\": %d," /* src port */
  66. "\"11\": %d," /* dst port */
  67. "\"1\": %d," /* in octets */
  68. "\"2\": %d," /* in packets */
  69. "\"23\": %d," /* out octets */
  70. "\"24\": %d," /* out packets */
  71. "\"22\": %d," /* start timestamp */
  72. "\"21\": %d," /* last timestamp */
  73. "\"6\": %d," /* tcp flags */
  74. "\"4\": %d", /* protocol */
  75. flow->port[0],
  76. flow->port[1],
  77. flow->octets[1],
  78. flow->packets[1],
  79. flow->octets[0],
  80. flow->packets[0],
  81. timeval_sub_ms (&flow->flow_start, &sp->param->system_boot_time),
  82. timeval_sub_ms (&flow->flow_last, &sp->param->system_boot_time),
  83. flow->tcp_flags[0],
  84. flow->protocol
  85. );
  86. if (size > (len - 1))
  87. return size;
  88. if (flow->af == AF_INET) {
  89. char src[INET_ADDRSTRLEN];
  90. char dst[INET_ADDRSTRLEN];
  91. /* safe to ignore errors, neither error case can occur */
  92. inet_ntop(AF_INET, &flow->addr[0].v4, src, sizeof(src));
  93. inet_ntop(AF_INET, &flow->addr[1].v4, dst, sizeof(dst));
  94. size += snprintf(buf + size, len - size,
  95. ",\"8\":\"%s\"," /* ipv4 src addr */
  96. "\"12\":\"%s\"", /* ipv4 dst addr */
  97. src,
  98. dst
  99. );
  100. } else {
  101. char src[INET6_ADDRSTRLEN];
  102. char dst[INET6_ADDRSTRLEN];
  103. /* safe to ignore errors, neither error case can occur */
  104. inet_ntop(AF_INET6, &flow->addr[0].v6, src, sizeof(src));
  105. inet_ntop(AF_INET6, &flow->addr[1].v6, dst, sizeof(dst));
  106. size += snprintf(buf + size, len - size,
  107. ",\"27\":\"%s\"," /* ipv6 src addr */
  108. "\"28\":\"%s\"", /* ipv6 dst addr */
  109. src,
  110. dst
  111. );
  112. }
  113. if (size > (len - 1))
  114. return size;
  115. if (sp->param->track_level >= TRACK_FULL_VLAN) {
  116. size += snprintf(buf + size, len - size,
  117. ",\"58\":%d," /* vlan src */
  118. "\"59\":%d", /* vlan dst */
  119. flow->vlanid[0],
  120. flow->vlanid[1]
  121. );
  122. if (size > (len - 1))
  123. return size;
  124. }
  125. if (sp->param->track_level >= TRACK_FULL_VLAN_ETHER) {
  126. size += snprintf(buf + size, len - size,
  127. ",\"56\":\"%d:%d:%d:%d:%d:%d\"," /* ether mac src */
  128. "\"57\":\"%d:%d:%d:%d:%d:%d\"", /* ether mac dst */
  129. flow->ethermac[0][0],flow->ethermac[0][1],flow->ethermac[0][2],
  130. flow->ethermac[0][3],flow->ethermac[0][4],flow->ethermac[0][5],
  131. flow->ethermac[1][0],flow->ethermac[1][1],flow->ethermac[1][2],
  132. flow->ethermac[1][3],flow->ethermac[1][4],flow->ethermac[1][5]
  133. );
  134. }
  135. if (size > (len - 1))
  136. return size;
  137. size += snprintf(buf + size, len - size, "}");
  138. return size;
  139. }
  140. #define MAX_JSON_SIZE 7168
  141. int
  142. send_ntopng_message (struct SENDPARAMETER *sp, int start_at_flow) {
  143. struct NTOPNG_MSG_HEADER header;
  144. static uint32_t msg_id = 0;
  145. char json[MAX_JSON_SIZE];
  146. int json_used = 0;
  147. int flow = start_at_flow;
  148. bool first = true;
  149. int target = 0;
  150. header.url[0] = 'f';
  151. header.url[1] = 'l';
  152. header.url[2] = 'o';
  153. header.url[3] = 'w';
  154. memset(header.url + 4, 0, sizeof(header.url) - 4);
  155. header.version = 2;
  156. header.msg_id = htonl(msg_id++);
  157. json_used += snprintf(json + json_used, MAX_JSON_SIZE - json_used, "[");
  158. while (flow < sp->num_flows) {
  159. int size = 0;
  160. if (first) {
  161. first = false;
  162. } else {
  163. json_used += snprintf (json + json_used, MAX_JSON_SIZE - json_used, ",\n");
  164. }
  165. size = add_json_flow (sp, sp->flows[flow], json + json_used, MAX_JSON_SIZE - json_used);
  166. if (size > (MAX_JSON_SIZE - json_used - 2 -2)) { /* space for "]\0" and next ",\n"*/
  167. break;
  168. }
  169. json_used += size;
  170. flow++;
  171. }
  172. json_used += snprintf (json + json_used, MAX_JSON_SIZE - json_used, "]");
  173. header.size = htons(json_used);
  174. for (target = 0; target < sp->target->num_destinations; target++) {
  175. zmq_send(sp->target->destinations[target].zmq.socket, &header, sizeof(header), ZMQ_SNDMORE);
  176. zmq_send(sp->target->destinations[target].zmq.socket, json, json_used, 0);
  177. }
  178. return flow;
  179. }
  180. int
  181. send_ntopng(struct SENDPARAMETER sp) {
  182. int flow = 0;
  183. int packets = 0;
  184. while (flow < sp.num_flows) {
  185. flow = send_ntopng_message(&sp, flow);
  186. packets++;
  187. }
  188. sp.param->records_sent += flow;
  189. sp.param->packets_sent += packets;
  190. #ifdef ENABLE_PTHREAD
  191. if (use_thread)
  192. free (sp.flows);
  193. #endif /* ENABLE_PTHREAD */
  194. return packets;
  195. }