softflowd.c 78 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521
  1. /*
  2. * Copyright 2002 Damien Miller <djm@mindrot.org> All rights reserved.
  3. *
  4. * Redistribution and use in source and binary forms, with or without
  5. * modification, are permitted provided that the following conditions
  6. * are met:
  7. * 1. Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * 2. Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. *
  13. * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  14. * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  15. * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  16. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  17. * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  18. * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  19. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  20. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  21. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  22. * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  23. */
  24. /*
  25. * This is software implementation of Cisco's NetFlow(tm) traffic
  26. * reporting system. It operates by listening (via libpcap) on a
  27. * promiscuous interface and tracking traffic flows.
  28. *
  29. * Traffic flows are recorded by source/destination/protocol
  30. * IP address or, in the case of TCP and UDP, by
  31. * src_addr:src_port/dest_addr:dest_port/protocol
  32. *
  33. * Flows expire automatically after a period of inactivity (default: 1
  34. * hour) They may also be evicted (in order of age) in situations where
  35. * there are more flows than slots available.
  36. *
  37. * Netflow compatible packets are sent to a specified target host upon
  38. * flow expiry.
  39. *
  40. * As this implementation watches traffic promiscuously, it is likely to
  41. * place significant load on hosts or gateways on which it is installed.
  42. */
  43. #include "common.h"
  44. #include "sys-tree.h"
  45. #include "convtime.h"
  46. #include "softflowd.h"
  47. #include "treetype.h"
  48. #include "freelist.h"
  49. #include "log.h"
  50. #include "netflow9.h"
  51. #include "ipfix.h"
  52. #include "psamp.h"
  53. #include <pcap.h>
  54. #ifdef LINUX
  55. #include <net/if.h>
  56. #endif /* LINUX */
  57. #define IPFIX_PORT 4739
  58. /* Global variables */
  59. static int verbose_flag = 0; /* Debugging flag */
  60. static u_int16_t if_index = 0; /* "manual" interface index */
  61. static int track_level;
  62. static int snaplen = 0;
  63. #ifdef ENABLE_PTHREAD
  64. pthread_mutex_t read_mutex;
  65. pthread_cond_t read_cond;
  66. int use_thread;
  67. u_char packet_data[1500];
  68. struct pcap_pkthdr packet_header;
  69. struct FLOW *send_expired_flows;
  70. #endif /* ENABLE_PTHREAD */
  71. /* Signal handler flags */
  72. static volatile sig_atomic_t graceful_shutdown_request = 0;
  73. /* Describes a datalink header and how to extract v4/v6 frames from it */
  74. struct DATALINK {
  75. int dlt; /* BPF datalink type */
  76. int skiplen; /* Number of bytes to skip datalink header */
  77. int ft_off; /* Datalink frametype offset */
  78. int ft_len; /* Datalink frametype length */
  79. int ft_is_be; /* Set if frametype is big-endian */
  80. u_int32_t ft_mask; /* Mask applied to frametype */
  81. u_int32_t ft_v4; /* IPv4 frametype */
  82. u_int32_t ft_v6; /* IPv6 frametype */
  83. };
  84. /* Datalink types that we know about */
  85. static const struct DATALINK lt[] = {
  86. {DLT_EN10MB, 14, 12, 2, 1, 0xffffffff, 0x0800, 0x86dd},
  87. {DLT_PPP, 5, 3, 2, 1, 0xffffffff, 0x0021, 0x0057},
  88. #ifdef DLT_LINUX_SLL
  89. {DLT_LINUX_SLL, 16, 14, 2, 1, 0xffffffff, 0x0800, 0x86dd},
  90. #endif
  91. {DLT_RAW, 0, 0, 1, 1, 0x000000f0, 0x0040, 0x0060},
  92. {DLT_NULL, 4, 0, 4, 0, 0xffffffff, AF_INET, AF_INET6},
  93. #ifdef DLT_LOOP
  94. {DLT_LOOP, 4, 0, 4, 1, 0xffffffff, AF_INET, AF_INET6},
  95. #endif
  96. #ifdef DLT_PFLOG
  97. {DLT_PFLOG, 48, 1, 1, 0, 0x000000ff, AF_INET, AF_INET6},
  98. #endif
  99. {-1, -1, -1, -1, -1, 0x00000000, 0xffff, 0xffff},
  100. };
  101. /* Netflow send functions */
  102. typedef int (netflow_send_func_t) (struct SENDPARAMETER);
  103. struct NETFLOW_SENDER {
  104. int version;
  105. netflow_send_func_t *func;
  106. netflow_send_func_t *bidir_func;
  107. int v6_capable;
  108. };
  109. /* Array of NetFlow export function that we know of. NB. nf[0] is default */
  110. static const struct NETFLOW_SENDER nf[] = {
  111. {5, send_netflow_v5, NULL, 0},
  112. {1, send_netflow_v1, NULL, 0},
  113. #ifdef ENABLE_LEGACY
  114. {9, send_netflow_v9, NULL, 1},
  115. #else /* ENABLE_LEGACY */
  116. {9, send_nflow9, NULL, 1},
  117. #endif /* ENABLE_LEGACY */
  118. {NF_VERSION_IPFIX, send_ipfix, send_ipfix_bi, 1},
  119. #ifdef ENABLE_NTOPNG
  120. {SOFTFLOWD_NF_VERSION_NTOPNG, send_ntopng, NULL, 1},
  121. #endif
  122. };
  123. static const struct NETFLOW_SENDER *
  124. lookup_netflow_sender (int version) {
  125. int i, r;
  126. for (i = 0, r = version; i < sizeof (nf) / sizeof (struct NETFLOW_SENDER);
  127. i++) {
  128. if (nf[i].version == r)
  129. return &nf[i];
  130. }
  131. return NULL;
  132. }
  133. /* Signal handlers */
  134. static void
  135. sighand_graceful_shutdown (int signum) {
  136. graceful_shutdown_request = signum;
  137. }
  138. static void
  139. sighand_other (int signum) {
  140. /* XXX: this may not be completely safe */
  141. logit (LOG_WARNING, "Exiting immediately on unexpected signal %d", signum);
  142. _exit (0);
  143. }
  144. /*
  145. * This is the flow comparison function.
  146. */
  147. static int
  148. flow_compare (struct FLOW *a, struct FLOW *b) {
  149. /* Be careful to avoid signed vs unsigned issues here */
  150. int r, i;
  151. if (track_level == TRACK_FULL_VLAN || track_level == TRACK_FULL_VLAN_ETHER) {
  152. if (a->vlanid[0] != b->vlanid[0])
  153. return (a->vlanid[0] > b->vlanid[0] ? 1 : -1);
  154. if (a->vlanid[1] != b->vlanid[1])
  155. return (a->vlanid[1] > b->vlanid[1] ? 1 : -1);
  156. }
  157. if (track_level == TRACK_FULL_VLAN_ETHER) {
  158. if ((r = memcmp (&a->ethermac[0], &b->ethermac[0], 6)) != 0)
  159. return (r > 0 ? 1 : -1);
  160. if ((r = memcmp (&a->ethermac[1], &b->ethermac[1], 6)) != 0)
  161. return (r > 0 ? 1 : -1);
  162. }
  163. if (a->af != b->af)
  164. return (a->af > b->af ? 1 : -1);
  165. if ((r = memcmp (&a->addr[0], &b->addr[0], sizeof (a->addr[0]))) != 0)
  166. return (r > 0 ? 1 : -1);
  167. if ((r = memcmp (&a->addr[1], &b->addr[1], sizeof (a->addr[1]))) != 0)
  168. return (r > 0 ? 1 : -1);
  169. #ifdef notyet
  170. if (a->ip6_flowlabel[0] != 0 && b->ip6_flowlabel[0] != 0 &&
  171. a->ip6_flowlabel[0] != b->ip6_flowlabel[0])
  172. return (a->ip6_flowlabel[0] > b->ip6_flowlabel[0] ? 1 : -1);
  173. if (a->ip6_flowlabel[1] != 0 && b->ip6_flowlabel[1] != 0 &&
  174. a->ip6_flowlabel[1] != b->ip6_flowlabel[1])
  175. return (a->ip6_flowlabel[1] > b->ip6_flowlabel[1] ? 1 : -1);
  176. #endif
  177. if (a->protocol != b->protocol)
  178. return (a->protocol > b->protocol ? 1 : -1);
  179. if (a->port[0] != b->port[0])
  180. return (ntohs (a->port[0]) > ntohs (b->port[0]) ? 1 : -1);
  181. if (a->port[1] != b->port[1])
  182. return (ntohs (a->port[1]) > ntohs (b->port[1]) ? 1 : -1);
  183. if (a->mplsLabelStackDepth != b->mplsLabelStackDepth)
  184. return (a->mplsLabelStackDepth > b->mplsLabelStackDepth ? 1 : -1);
  185. for (i = 0; i < a->mplsLabelStackDepth; i++) {
  186. if (a->mplsLabels[i] != b->mplsLabels[i])
  187. return (a->mplsLabels[i] > b->mplsLabels[i] ? 1 : -1);
  188. }
  189. return (0);
  190. }
  191. /* Generate functions for flow tree */
  192. FLOW_PROTOTYPE (FLOWS, FLOW, trp, flow_compare);
  193. FLOW_GENERATE (FLOWS, FLOW, trp, flow_compare);
  194. /*
  195. * This is the expiry comparison function.
  196. */
  197. static int
  198. expiry_compare (struct EXPIRY *a, struct EXPIRY *b) {
  199. if (a->expires_at != b->expires_at)
  200. return (a->expires_at > b->expires_at ? 1 : -1);
  201. /* Make expiry entries unique by comparing flow sequence */
  202. if (a->flow->flow_seq != b->flow->flow_seq)
  203. return (a->flow->flow_seq > b->flow->flow_seq ? 1 : -1);
  204. return (0);
  205. }
  206. /* Generate functions for flow tree */
  207. EXPIRY_PROTOTYPE (EXPIRIES, EXPIRY, trp, expiry_compare);
  208. EXPIRY_GENERATE (EXPIRIES, EXPIRY, trp, expiry_compare);
  209. static struct FLOW *
  210. flow_get (struct FLOWTRACK *ft) {
  211. return freelist_get (&ft->flow_freelist);
  212. }
  213. static void
  214. flow_put (struct FLOWTRACK *ft, struct FLOW *flow) {
  215. return freelist_put (&ft->flow_freelist, flow);
  216. }
  217. static struct EXPIRY *
  218. expiry_get (struct FLOWTRACK *ft) {
  219. return freelist_get (&ft->expiry_freelist);
  220. }
  221. static void
  222. expiry_put (struct FLOWTRACK *ft, struct EXPIRY *expiry) {
  223. return freelist_put (&ft->expiry_freelist, expiry);
  224. }
  225. #if 0
  226. /* Dump a packet */
  227. static void
  228. dump_packet (const u_int8_t * p, int len) {
  229. char buf[1024], tmp[3];
  230. int i;
  231. for (*buf = '\0', i = 0; i < len; i++) {
  232. snprintf (tmp, sizeof (tmp), "%02x%s", p[i], i % 2 ? " " : "");
  233. if (strlcat (buf, tmp, sizeof (buf) - 4) >= sizeof (buf) - 4) {
  234. strlcat (buf, "...", sizeof (buf));
  235. break;
  236. }
  237. }
  238. logit (LOG_INFO, "packet len %d: %s", len, buf);
  239. }
  240. #endif
  241. /* Format a time in an ISOish format */
  242. static const char *
  243. format_time (time_t t) {
  244. struct tm *tm;
  245. static char buf[32];
  246. tm = gmtime (&t);
  247. strftime (buf, sizeof (buf), "%Y-%m-%dT%H:%M:%S", tm);
  248. return (buf);
  249. }
  250. static const char *
  251. format_ethermac (uint8_t ethermac[6]) {
  252. static char buf[1024];
  253. snprintf (buf, sizeof (buf), "%.2x:%.2x:%.2x:%.2x:%.2x:%.2x",
  254. ethermac[0], ethermac[1], ethermac[2], ethermac[3],
  255. ethermac[4], ethermac[5]);
  256. return buf;
  257. }
  258. /* Format a flow in a verbose and ugly way */
  259. static const char *
  260. format_flow (struct FLOW *flow) {
  261. char addr1[64], addr2[64], start_time[32], fin_time[32];
  262. static char buf[4096];
  263. inet_ntop (flow->af, &flow->addr[0], addr1, sizeof (addr1));
  264. inet_ntop (flow->af, &flow->addr[1], addr2, sizeof (addr2));
  265. snprintf (start_time, sizeof (start_time), "%s",
  266. format_time (flow->flow_start.tv_sec));
  267. snprintf (fin_time, sizeof (fin_time), "%s",
  268. format_time (flow->flow_last.tv_sec));
  269. snprintf (buf, sizeof (buf),
  270. "seq:%" PRIu64 " [%s]:%hu <> [%s]:%hu proto:%u "
  271. "octets>:%u packets>:%u octets<:%u packets<:%u "
  272. "start:%s.%03ld finish:%s.%03ld tcp>:%02x tcp<:%02x "
  273. "flowlabel>:%08x flowlabel<:%08x "
  274. "vlan>:%u vlan<:%u ether:%s <> %s", flow->flow_seq, addr1,
  275. ntohs (flow->port[0]), addr2, ntohs (flow->port[1]),
  276. (int) flow->protocol, flow->octets[0], flow->packets[0],
  277. flow->octets[1], flow->packets[1], start_time,
  278. (flow->flow_start.tv_usec + 500) / 1000, fin_time,
  279. (flow->flow_last.tv_usec + 500) / 1000, flow->tcp_flags[0],
  280. flow->tcp_flags[1], flow->ip6_flowlabel[0],
  281. flow->ip6_flowlabel[1], flow->vlanid[0], flow->vlanid[1],
  282. format_ethermac (flow->ethermac[0]),
  283. format_ethermac (flow->ethermac[1]));
  284. return (buf);
  285. }
  286. /* Format a flow in a brief way */
  287. static const char *
  288. format_flow_brief (struct FLOW *flow) {
  289. char addr1[64], addr2[64];
  290. static char buf[4096];
  291. inet_ntop (flow->af, &flow->addr[0], addr1, sizeof (addr1));
  292. inet_ntop (flow->af, &flow->addr[1], addr2, sizeof (addr2));
  293. snprintf (buf, sizeof (buf),
  294. "seq:%" PRIu64 " [%s]:%hu <> [%s]:%hu proto:%u "
  295. "vlan>:%u vlan<:%u ether:%s <> %s ",
  296. flow->flow_seq,
  297. addr1, ntohs (flow->port[0]), addr2, ntohs (flow->port[1]),
  298. (int) flow->protocol, flow->vlanid[0], flow->vlanid[1],
  299. format_ethermac (flow->ethermac[0]),
  300. format_ethermac (flow->ethermac[1]));
  301. return (buf);
  302. }
  303. /* Fill in transport-layer (tcp/udp) portions of flow record */
  304. static int
  305. transport_to_flowrec (struct FLOW *flow, const u_int8_t * pkt,
  306. const size_t caplen, int isfrag, int protocol, int ndx)
  307. {
  308. const struct tcphdr *tcp = (const struct tcphdr *) pkt;
  309. const struct udphdr *udp = (const struct udphdr *) pkt;
  310. const struct icmp *icmp = (const struct icmp *) pkt;
  311. /*
  312. * XXX to keep flow in proper canonical format, it may be necessary to
  313. * swap the array slots based on the order of the port numbers does
  314. * this matter in practice??? I don't think so - return flows will
  315. * always match, because of their symmetrical addr/ports
  316. */
  317. switch (protocol) {
  318. case IPPROTO_TCP:
  319. /* Check for runt packet, but don't error out on short frags */
  320. if (caplen < sizeof (*tcp))
  321. return (isfrag ? 0 : 1);
  322. flow->port[ndx] = tcp->th_sport;
  323. flow->port[ndx ^ 1] = tcp->th_dport;
  324. flow->tcp_flags[ndx] |= tcp->th_flags;
  325. break;
  326. case IPPROTO_UDP:
  327. /* Check for runt packet, but don't error out on short frags */
  328. if (caplen < sizeof (*udp))
  329. return (isfrag ? 0 : 1);
  330. flow->port[ndx] = udp->uh_sport;
  331. flow->port[ndx ^ 1] = udp->uh_dport;
  332. break;
  333. case IPPROTO_ICMP:
  334. case IPPROTO_ICMPV6:
  335. /*
  336. * Encode ICMP type * 256 + code into dest port like
  337. * Cisco routers
  338. */
  339. flow->port[ndx] = 0;
  340. flow->port[ndx ^ 1] = htons (icmp->icmp_type * 256 + icmp->icmp_code);
  341. break;
  342. }
  343. return (0);
  344. }
  345. static int
  346. make_ndx_ipv4 (const struct ip *ip, size_t caplen) {
  347. if (caplen < 20 || caplen < ip->ip_hl * 4)
  348. return (-1); /* Runt packet */
  349. if (ip->ip_v != 4)
  350. return (-1); /* Unsupported IP version */
  351. /* Prepare to store flow in canonical format */
  352. return (memcmp (&ip->ip_src, &ip->ip_dst, sizeof (ip->ip_src)) > 0 ? 1 : 0);
  353. }
  354. /* Convert a IPv4 packet to a partial flow record (used for comparison) */
  355. static int
  356. ipv4_to_flowrec (struct FLOW *flow, const u_int8_t * pkt, size_t caplen,
  357. size_t len, int *isfrag, int af, int ndx) {
  358. const struct ip *ip = (const struct ip *) pkt;
  359. //int ndx = make_ndx_ipv4 (ip, caplen);
  360. if (ndx < 0)
  361. return (-1);
  362. flow->af = af;
  363. flow->addr[ndx].v4 = ip->ip_src;
  364. flow->addr[ndx ^ 1].v4 = ip->ip_dst;
  365. flow->protocol = ip->ip_p;
  366. flow->octets[ndx] = len;
  367. flow->packets[ndx] = 1;
  368. flow->tos[ndx] = ip->ip_tos;
  369. *isfrag = (ntohs (ip->ip_off) & (IP_OFFMASK | IP_MF)) ? 1 : 0;
  370. /* Don't try to examine higher level headers if not first fragment */
  371. if (*isfrag && (ntohs (ip->ip_off) & IP_OFFMASK) != 0)
  372. return (0);
  373. return (transport_to_flowrec (flow, pkt + (ip->ip_hl * 4),
  374. caplen - (ip->ip_hl * 4), *isfrag, ip->ip_p,
  375. ndx));
  376. }
  377. static int
  378. make_ndx_ipv6 (const struct ip6_hdr *ip6, size_t caplen) {
  379. if (caplen < sizeof (*ip6))
  380. return (-1); /* Runt packet */
  381. if ((ip6->ip6_vfc & IPV6_VERSION_MASK) != IPV6_VERSION)
  382. return (-1); /* Unsupported IPv6 version */
  383. /* Prepare to store flow in canonical format */
  384. return (memcmp (&ip6->ip6_src, &ip6->ip6_dst,
  385. sizeof (ip6->ip6_src)) > 0 ? 1 : 0);
  386. }
  387. /* Convert a IPv6 packet to a partial flow record (used for comparison) */
  388. static int
  389. ipv6_to_flowrec (struct FLOW *flow, const u_int8_t * pkt, size_t caplen,
  390. size_t len, int *isfrag, int af, int ndx) {
  391. const struct ip6_hdr *ip6 = (const struct ip6_hdr *) pkt;
  392. const struct ip6_ext *eh6;
  393. const struct ip6_frag *fh6;
  394. int nxt;
  395. if (ndx < 0)
  396. return (-1);
  397. flow->af = af;
  398. flow->ip6_flowlabel[ndx] = ip6->ip6_flow & IPV6_FLOWLABEL_MASK;
  399. flow->addr[ndx].v6 = ip6->ip6_src;
  400. flow->addr[ndx ^ 1].v6 = ip6->ip6_dst;
  401. flow->octets[ndx] = len;
  402. flow->packets[ndx] = 1;
  403. flow->tos[ndx] = (ntohl (ip6->ip6_flow) & ntohl (0x0ff00000)) >> 20;
  404. *isfrag = 0;
  405. nxt = ip6->ip6_nxt;
  406. pkt += sizeof (*ip6);
  407. caplen -= sizeof (*ip6);
  408. /* Now loop through headers, looking for transport header */
  409. for (;;) {
  410. eh6 = (const struct ip6_ext *) pkt;
  411. if (nxt == IPPROTO_HOPOPTS ||
  412. nxt == IPPROTO_ROUTING || nxt == IPPROTO_DSTOPTS) {
  413. if (caplen < sizeof (*eh6) || caplen < (eh6->ip6e_len + 1) << 3)
  414. return (1); /* Runt */
  415. nxt = eh6->ip6e_nxt;
  416. pkt += (eh6->ip6e_len + 1) << 3;
  417. caplen -= (eh6->ip6e_len + 1) << 3;
  418. } else if (nxt == IPPROTO_FRAGMENT) {
  419. *isfrag = 1;
  420. fh6 = (const struct ip6_frag *) eh6;
  421. if (caplen < sizeof (*fh6))
  422. return (1); /* Runt */
  423. /*
  424. * Don't try to examine higher level headers if
  425. * not first fragment
  426. */
  427. if ((fh6->ip6f_offlg & IP6F_OFF_MASK) != 0)
  428. return (0);
  429. nxt = fh6->ip6f_nxt;
  430. pkt += sizeof (*fh6);
  431. caplen -= sizeof (*fh6);
  432. } else
  433. break;
  434. }
  435. flow->protocol = nxt;
  436. return (transport_to_flowrec (flow, pkt, caplen, *isfrag, nxt, ndx));
  437. }
  438. static int
  439. vlan_to_flowrec (struct FLOW *flow, u_int16_t vlanid, int ndx) {
  440. if (ndx < 0)
  441. return (-1);
  442. return (flow->vlanid[ndx] = vlanid);
  443. }
  444. static int
  445. ether_to_flowrec (struct FLOW *flow, struct ether_header *ether, int ndx) {
  446. if (ndx < 0)
  447. return (-1);
  448. if (ether == NULL)
  449. return (-1);
  450. memcpy (flow->ethermac[ndx], ether->ether_shost, ETH_ALEN);
  451. memcpy (flow->ethermac[ndx ^ 1], ether->ether_dhost, ETH_ALEN);
  452. return (1);
  453. }
  454. static void
  455. flow_update_expiry (struct FLOWTRACK *ft, struct FLOW *flow) {
  456. EXPIRY_REMOVE (EXPIRIES, &ft->expiries, flow->expiry);
  457. /* Flows over 2 GiB traffic */
  458. if (flow->octets[0] > (1U << 31) || flow->octets[1] > (1U << 31)) {
  459. flow->expiry->expires_at = 0;
  460. flow->expiry->reason = R_OVERBYTES;
  461. flow->flowEndReason = IPFIX_flowEndReason_lackOfResource;
  462. goto out;
  463. }
  464. /* Flows over maximum life seconds */
  465. if (ft->param.maximum_lifetime != 0 &&
  466. flow->flow_last.tv_sec - flow->flow_start.tv_sec >
  467. ft->param.maximum_lifetime) {
  468. flow->expiry->expires_at = 0;
  469. flow->expiry->reason = R_MAXLIFE;
  470. flow->flowEndReason = IPFIX_flowEndReason_activeTimeout;
  471. goto out;
  472. }
  473. if (flow->protocol == IPPROTO_TCP) {
  474. /* Reset TCP flows */
  475. if (ft->param.tcp_rst_timeout != 0 &&
  476. ((flow->tcp_flags[0] & TH_RST) || (flow->tcp_flags[1] & TH_RST))) {
  477. flow->expiry->expires_at = flow->flow_last.tv_sec +
  478. ft->param.tcp_rst_timeout;
  479. flow->expiry->reason = R_TCP_RST;
  480. flow->flowEndReason = IPFIX_flowEndReason_endOfFlow;
  481. goto out;
  482. }
  483. /* Finished TCP flows */
  484. if (ft->param.tcp_fin_timeout != 0 &&
  485. ((flow->tcp_flags[0] & TH_FIN) && (flow->tcp_flags[1] & TH_FIN))) {
  486. flow->expiry->expires_at = flow->flow_last.tv_sec +
  487. ft->param.tcp_fin_timeout;
  488. flow->expiry->reason = R_TCP_FIN;
  489. flow->flowEndReason = IPFIX_flowEndReason_endOfFlow;
  490. goto out;
  491. }
  492. /* TCP flows */
  493. if (ft->param.tcp_timeout != 0) {
  494. flow->expiry->expires_at = flow->flow_last.tv_sec +
  495. ft->param.tcp_timeout;
  496. flow->expiry->reason = R_TCP;
  497. flow->flowEndReason = IPFIX_flowEndReason_idleTimeout;
  498. goto out;
  499. }
  500. }
  501. if (ft->param.udp_timeout != 0 && flow->protocol == IPPROTO_UDP) {
  502. /* UDP flows */
  503. flow->expiry->expires_at = flow->flow_last.tv_sec + ft->param.udp_timeout;
  504. flow->expiry->reason = R_UDP;
  505. flow->flowEndReason = IPFIX_flowEndReason_idleTimeout;
  506. goto out;
  507. }
  508. if (ft->param.icmp_timeout != 0 &&
  509. ((flow->af == AF_INET && flow->protocol == IPPROTO_ICMP) ||
  510. ((flow->af == AF_INET6 && flow->protocol == IPPROTO_ICMPV6)))) {
  511. /* ICMP flows */
  512. flow->expiry->expires_at = flow->flow_last.tv_sec +
  513. ft->param.icmp_timeout;
  514. flow->expiry->reason = R_ICMP;
  515. flow->flowEndReason = IPFIX_flowEndReason_idleTimeout;
  516. goto out;
  517. }
  518. /* Everything else */
  519. flow->expiry->expires_at = flow->flow_last.tv_sec +
  520. ft->param.general_timeout;
  521. flow->expiry->reason = R_GENERAL;
  522. flow->flowEndReason = IPFIX_flowEndReason_idleTimeout;
  523. out:
  524. if (ft->param.maximum_lifetime != 0 && flow->expiry->expires_at != 0) {
  525. flow->expiry->expires_at = MIN (flow->expiry->expires_at,
  526. flow->flow_start.tv_sec +
  527. ft->param.maximum_lifetime);
  528. }
  529. EXPIRY_INSERT (EXPIRIES, &ft->expiries, flow->expiry);
  530. }
  531. /* Return values from process_packet */
  532. #define PP_OK 0
  533. #define PP_BAD_PACKET -2
  534. #define PP_MALLOC_FAIL -3
  535. /*
  536. * Main per-packet processing function. Take a packet (provided by
  537. * libpcap) and attempt to find a matching flow. If no such flow exists,
  538. * then create one.
  539. *
  540. * Also marks flows for fast expiry, based on flow or packet attributes
  541. * (the actual expiry is performed elsewhere)
  542. */
  543. static int
  544. process_packet (struct FLOWTRACK *ft, const u_int8_t * frame_data, int af,
  545. const u_int32_t caplen, const u_int32_t len,
  546. struct ether_header *ether, u_int16_t vlanid,
  547. const struct timeval *received_time, u_int8_t num_label) {
  548. struct FLOW tmp, *flow;
  549. int frag, ndx, i;
  550. const u_int8_t *pkt = frame_data + num_label * 4;
  551. /* Convert the IP packet to a flow identity */
  552. memset (&tmp, 0, sizeof (tmp));
  553. switch (af) {
  554. case AF_INET:
  555. ndx = make_ndx_ipv4 ((const struct ip *) pkt, caplen);
  556. if (ipv4_to_flowrec (&tmp, pkt, caplen, len, &frag, af, ndx) == -1)
  557. goto bad;
  558. break;
  559. case AF_INET6:
  560. ndx = make_ndx_ipv6 ((const struct ip6_hdr *) pkt, caplen);
  561. if (ipv6_to_flowrec (&tmp, pkt, caplen, len, &frag, af, ndx) == -1)
  562. goto bad;
  563. break;
  564. default:
  565. bad:
  566. ft->param.bad_packets++;
  567. return (PP_BAD_PACKET);
  568. }
  569. if (frag)
  570. ft->param.frag_packets++;
  571. /* Zero out bits of the flow that aren't relevant to tracking level */
  572. switch (ft->param.track_level) {
  573. case TRACK_IP_ONLY:
  574. tmp.protocol = 0;
  575. /* FALLTHROUGH */
  576. case TRACK_IP_PROTO:
  577. tmp.port[0] = tmp.port[1] = 0;
  578. tmp.tcp_flags[0] = tmp.tcp_flags[1] = 0;
  579. /* FALLTHROUGH */
  580. case TRACK_FULL:
  581. tmp.vlanid[0] = tmp.vlanid[1] = 0;
  582. break;
  583. case TRACK_FULL_VLAN_ETHER:
  584. ether_to_flowrec (&tmp, ether, ndx);
  585. /* FALLTHROUGH */
  586. case TRACK_FULL_VLAN:
  587. vlan_to_flowrec (&tmp, vlanid, ndx);
  588. break;
  589. }
  590. tmp.mplsLabelStackDepth = num_label;
  591. for (i = 0; i < num_label && i < 10; i++) {
  592. tmp.mplsLabels[i] = *(((u_int32_t *) frame_data) + i);
  593. }
  594. /* If a matching flow does not exist, create and insert one */
  595. if ((flow = FLOW_FIND (FLOWS, &ft->flows, &tmp)) == NULL) {
  596. /* Allocate and fill in the flow */
  597. if ((flow = flow_get (ft)) == NULL) {
  598. logit (LOG_ERR, "process_packet: flow_get failed", sizeof (*flow));
  599. return (PP_MALLOC_FAIL);
  600. }
  601. memcpy (flow, &tmp, sizeof (*flow));
  602. memcpy (&flow->flow_start, received_time, sizeof (flow->flow_start));
  603. flow->flow_seq = ft->param.next_flow_seq++;
  604. FLOW_INSERT (FLOWS, &ft->flows, flow);
  605. /* Allocate and fill in the associated expiry event */
  606. if ((flow->expiry = expiry_get (ft)) == NULL) {
  607. logit (LOG_ERR, "process_packet: expiry_get failed",
  608. sizeof (*flow->expiry));
  609. return (PP_MALLOC_FAIL);
  610. }
  611. flow->expiry->flow = flow;
  612. /* Must be non-zero (0 means expire immediately) */
  613. flow->expiry->expires_at = 1;
  614. flow->expiry->reason = R_GENERAL;
  615. flow->flowEndReason = IPFIX_flowEndReason_idleTimeout;
  616. EXPIRY_INSERT (EXPIRIES, &ft->expiries, flow->expiry);
  617. ft->param.num_flows++;
  618. if (verbose_flag)
  619. logit (LOG_DEBUG, "ADD FLOW %s", format_flow_brief (flow));
  620. } else {
  621. /* Update flow statistics */
  622. flow->packets[0] += tmp.packets[0];
  623. flow->octets[0] += tmp.octets[0];
  624. flow->tcp_flags[0] |= tmp.tcp_flags[0];
  625. flow->packets[1] += tmp.packets[1];
  626. flow->octets[1] += tmp.octets[1];
  627. flow->tcp_flags[1] |= tmp.tcp_flags[1];
  628. }
  629. memcpy (&flow->flow_last, received_time, sizeof (flow->flow_last));
  630. if (flow->expiry->expires_at != 0)
  631. flow_update_expiry (ft, flow);
  632. return (PP_OK);
  633. }
  634. /*
  635. * Subtract two timevals. Returns (t1 - t2) in milliseconds.
  636. */
  637. u_int32_t
  638. timeval_sub_ms (const struct timeval *t1, const struct timeval *t2) {
  639. struct timeval res;
  640. res.tv_sec = t1->tv_sec - t2->tv_sec;
  641. res.tv_usec = t1->tv_usec - t2->tv_usec;
  642. if (res.tv_usec < 0) {
  643. res.tv_usec += 1000000L;
  644. res.tv_sec--;
  645. }
  646. return ((u_int32_t) res.tv_sec * 1000 + (u_int32_t) res.tv_usec / 1000);
  647. }
  648. int
  649. send_multi_destinations (int num_destinations,
  650. struct DESTINATION *destinations,
  651. u_int8_t is_loadbalance, u_int8_t * packet,
  652. int size) {
  653. struct DESTINATION *dest;
  654. int i, err;
  655. socklen_t errsz;
  656. static u_int64_t sent = 0;
  657. for (i = 0; i < num_destinations; i++) {
  658. if (!is_loadbalance || (is_loadbalance && (sent % num_destinations == i))) {
  659. dest = &destinations[i];
  660. errsz = sizeof (err);
  661. getsockopt (dest->sock, SOL_SOCKET, SO_ERROR, &err, &errsz); // Clear ICMP errors
  662. if (send (dest->sock, packet, (size_t) size, 0) == -1)
  663. return (-1);
  664. }
  665. }
  666. sent++;
  667. return is_loadbalance ? 1 : i;
  668. }
  669. static void
  670. update_statistic (struct STATISTIC *s, double new, double n) {
  671. if (n == 1.0) {
  672. s->min = s->mean = s->max = new;
  673. return;
  674. }
  675. s->min = MIN (s->min, new);
  676. s->max = MAX (s->max, new);
  677. s->mean = s->mean + ((new - s->mean) / n);
  678. }
  679. /* Update global statistics */
  680. static void
  681. update_statistics (struct FLOWTRACK *ft, struct FLOW *flow) {
  682. double tmp;
  683. static double n = 1.0;
  684. ft->param.flows_expired++;
  685. ft->param.flows_pp[flow->protocol % 256]++;
  686. tmp = (double) flow->flow_last.tv_sec +
  687. ((double) flow->flow_last.tv_usec / 1000000.0);
  688. tmp -= (double) flow->flow_start.tv_sec +
  689. ((double) flow->flow_start.tv_usec / 1000000.0);
  690. if (tmp < 0.0)
  691. tmp = 0.0;
  692. update_statistic (&ft->param.duration, tmp, n);
  693. update_statistic (&ft->param.duration_pp[flow->protocol], tmp,
  694. (double) ft->param.flows_pp[flow->protocol % 256]);
  695. tmp = flow->octets[0] + flow->octets[1];
  696. update_statistic (&ft->param.octets, tmp, n);
  697. ft->param.octets_pp[flow->protocol % 256] += tmp;
  698. tmp = flow->packets[0] + flow->packets[1];
  699. update_statistic (&ft->param.packets, tmp, n);
  700. ft->param.packets_pp[flow->protocol % 256] += tmp;
  701. n++;
  702. }
  703. static void
  704. update_expiry_stats (struct FLOWTRACK *ft, struct EXPIRY *e) {
  705. switch (e->reason) {
  706. case R_GENERAL:
  707. ft->param.expired_general++;
  708. break;
  709. case R_TCP:
  710. ft->param.expired_tcp++;
  711. break;
  712. case R_TCP_RST:
  713. ft->param.expired_tcp_rst++;
  714. break;
  715. case R_TCP_FIN:
  716. ft->param.expired_tcp_fin++;
  717. break;
  718. case R_UDP:
  719. ft->param.expired_udp++;
  720. break;
  721. case R_ICMP:
  722. ft->param.expired_icmp++;
  723. break;
  724. case R_MAXLIFE:
  725. ft->param.expired_maxlife++;
  726. break;
  727. case R_OVERBYTES:
  728. ft->param.expired_overbytes++;
  729. break;
  730. case R_OVERFLOWS:
  731. ft->param.expired_maxflows++;
  732. break;
  733. case R_FLUSH:
  734. ft->param.expired_flush++;
  735. break;
  736. }
  737. }
  738. /* How long before the next expiry event in millisecond */
  739. static int
  740. next_expire (struct FLOWTRACK *ft) {
  741. struct EXPIRY *expiry;
  742. struct timeval now;
  743. u_int32_t expires_at, ret, fudge;
  744. if (ft->param.adjust_time)
  745. now = ft->param.last_packet_time;
  746. else
  747. gettimeofday (&now, NULL);
  748. if ((expiry = EXPIRY_MIN (EXPIRIES, &ft->expiries)) == NULL)
  749. return (-1); /* indefinite */
  750. expires_at = expiry->expires_at;
  751. /* Don't cluster urgent expiries */
  752. if (expires_at == 0 && (expiry->reason == R_OVERBYTES ||
  753. expiry->reason == R_OVERFLOWS
  754. || expiry->reason == R_FLUSH))
  755. return (0); /* Now */
  756. /* Cluster expiries by expiry_interval */
  757. if (ft->param.expiry_interval > 1) {
  758. if ((fudge = expires_at % ft->param.expiry_interval) > 0)
  759. expires_at += ft->param.expiry_interval - fudge;
  760. }
  761. if (expires_at < now.tv_sec)
  762. return (0); /* Now */
  763. ret = 999 + (expires_at - now.tv_sec) * 1000;
  764. return (ret);
  765. }
  766. /*
  767. * Scan the tree of expiry events and process expired flows. If zap_all
  768. * is set, then forcibly expire all flows.
  769. */
  770. #define CE_EXPIRE_NORMAL 0 /* Normal expiry processing */
  771. #define CE_EXPIRE_ALL -1 /* Expire all flows immediately */
  772. #define CE_EXPIRE_FORCED 1 /* Only expire force-expired flows */
  773. static int
  774. check_expired (struct FLOWTRACK *ft, struct NETFLOW_TARGET *target, int ex) {
  775. struct FLOW **expired_flows, **oldexp;
  776. int num_expired, i, r;
  777. struct timeval now;
  778. struct EXPIRY *expiry, *nexpiry;
  779. if (ft->param.adjust_time)
  780. now = ft->param.last_packet_time;
  781. else
  782. gettimeofday (&now, NULL);
  783. r = 0;
  784. num_expired = 0;
  785. expired_flows = NULL;
  786. if (verbose_flag)
  787. logit (LOG_DEBUG, "Starting expiry scan: mode %d", ex);
  788. for (expiry = EXPIRY_MIN (EXPIRIES, &ft->expiries);
  789. expiry != NULL; expiry = nexpiry) {
  790. nexpiry = EXPIRY_NEXT (EXPIRIES, &ft->expiries, expiry);
  791. if ((expiry->expires_at == 0) || (ex == CE_EXPIRE_ALL) ||
  792. (ex != CE_EXPIRE_FORCED && (expiry->expires_at < now.tv_sec))) {
  793. /* Flow has expired */
  794. if (ft->param.maximum_lifetime != 0 &&
  795. expiry->flow->flow_last.tv_sec -
  796. expiry->flow->flow_start.tv_sec >= ft->param.maximum_lifetime)
  797. expiry->reason = R_MAXLIFE;
  798. if (verbose_flag)
  799. logit (LOG_DEBUG,
  800. "Queuing flow seq:%" PRIu64 " (%p) for expiry "
  801. "reason %d", expiry->flow->flow_seq,
  802. expiry->flow, expiry->reason);
  803. /* Add to array of expired flows */
  804. oldexp = expired_flows;
  805. expired_flows = realloc (expired_flows,
  806. sizeof (*expired_flows) * (num_expired + 1));
  807. /* Don't fatal on realloc failures */
  808. if (expired_flows == NULL)
  809. expired_flows = oldexp;
  810. else {
  811. expired_flows[num_expired] = expiry->flow;
  812. num_expired++;
  813. }
  814. if (ex == CE_EXPIRE_ALL)
  815. expiry->reason = R_FLUSH;
  816. update_expiry_stats (ft, expiry);
  817. /* Remove from flow tree, destroy expiry event */
  818. FLOW_REMOVE (FLOWS, &ft->flows, expiry->flow);
  819. EXPIRY_REMOVE (EXPIRIES, &ft->expiries, expiry);
  820. expiry->flow->expiry = NULL;
  821. expiry_put (ft, expiry);
  822. ft->param.num_flows--;
  823. }
  824. }
  825. if (verbose_flag)
  826. logit (LOG_DEBUG, "Finished scan %d flow(s) to be evicted", num_expired);
  827. /* Processing for expired flows */
  828. if (num_expired > 0) {
  829. if (target != NULL) {
  830. struct SENDPARAMETER sp =
  831. { expired_flows, num_expired, target, if_index, &ft->param,
  832. verbose_flag
  833. };
  834. netflow_send_func_t *func =
  835. ft->param.bidirection ==
  836. 1 ? target->dialect->bidir_func : target->dialect->func;
  837. if (func == NULL) {
  838. func = target->dialect->func;
  839. }
  840. #ifdef ENABLE_PTHREAD
  841. if (use_thread) {
  842. pthread_t write_thread = 0;
  843. sp.flows = calloc (num_expired, sizeof (struct FLOW));
  844. memcpy (sp.flows, expired_flows, sizeof (struct FLOW) * num_expired);
  845. if (pthread_create (&write_thread, NULL, (void *) func, (void *) &sp)
  846. < 0) {
  847. perror ("pthread_create error");
  848. exit (1);
  849. }
  850. if (pthread_detach (write_thread) != 0) {
  851. perror ("pthread_detach error");
  852. exit (1);
  853. }
  854. r = 1;
  855. } else
  856. #endif /* ENABLE_PTHREAD */
  857. r = func (sp);
  858. if (verbose_flag)
  859. logit (LOG_DEBUG, "sent %d netflow packets", r);
  860. if (r <= 0)
  861. ft->param.flows_dropped += num_expired * 2; /* XXX what if r < num_expired * 2 ? */
  862. }
  863. for (i = 0; i < num_expired; i++) {
  864. if (verbose_flag) {
  865. logit (LOG_DEBUG, "EXPIRED: %s (%p)",
  866. format_flow (expired_flows[i]), expired_flows[i]);
  867. }
  868. update_statistics (ft, expired_flows[i]);
  869. flow_put (ft, expired_flows[i]);
  870. }
  871. free (expired_flows);
  872. }
  873. return (r == -1 ? -1 : num_expired);
  874. }
  875. /*
  876. * Force expiry of num_to_expire flows (e.g. when flow table overfull)
  877. */
  878. static void
  879. force_expire (struct FLOWTRACK *ft, u_int32_t num_to_expire) {
  880. struct EXPIRY *expiry, **expiryv;
  881. int i;
  882. /* XXX move all overflow processing here (maybe) */
  883. if (verbose_flag)
  884. logit (LOG_INFO, "Forcing expiry of %d flows", num_to_expire);
  885. /*
  886. * Do this in two steps, as it is dangerous to change a key on
  887. * a tree entry without first removing it and then re-adding it.
  888. * It is even worse when this has to be done during a FOREACH :)
  889. * To get around this, we make a list of expired flows and _then_
  890. * alter them
  891. */
  892. if ((expiryv = calloc (num_to_expire, sizeof (*expiryv))) == NULL) {
  893. /*
  894. * On malloc failure, expire ALL flows. I assume that
  895. * setting all the keys in a tree to the same value is
  896. * safe.
  897. */
  898. logit (LOG_ERR, "Out of memory while expiring flows - "
  899. "all flows expired");
  900. EXPIRY_FOREACH (expiry, EXPIRIES, &ft->expiries) {
  901. expiry->expires_at = 0;
  902. expiry->reason = R_OVERFLOWS;
  903. ft->param.flows_force_expired++;
  904. }
  905. return;
  906. }
  907. /* Make the list of flows to expire */
  908. i = 0;
  909. EXPIRY_FOREACH (expiry, EXPIRIES, &ft->expiries) {
  910. if (i >= num_to_expire)
  911. break;
  912. expiryv[i++] = expiry;
  913. }
  914. if (i < num_to_expire) {
  915. logit (LOG_ERR, "Needed to expire %d flows, "
  916. "but only %d active", num_to_expire, i);
  917. num_to_expire = i;
  918. }
  919. for (i = 0; i < num_to_expire; i++) {
  920. EXPIRY_REMOVE (EXPIRIES, &ft->expiries, expiryv[i]);
  921. expiryv[i]->expires_at = 0;
  922. expiryv[i]->reason = R_OVERFLOWS;
  923. EXPIRY_INSERT (EXPIRIES, &ft->expiries, expiryv[i]);
  924. }
  925. ft->param.flows_force_expired += num_to_expire;
  926. free (expiryv);
  927. /* XXX - this is overcomplicated, perhaps use a separate queue */
  928. }
  929. /* Delete all flows that we know about without processing */
  930. static int
  931. delete_all_flows (struct FLOWTRACK *ft) {
  932. struct FLOW *flow, *nflow;
  933. int i;
  934. i = 0;
  935. for (flow = FLOW_MIN (FLOWS, &ft->flows); flow != NULL; flow = nflow) {
  936. nflow = FLOW_NEXT (FLOWS, &ft->flows, flow);
  937. FLOW_REMOVE (FLOWS, &ft->flows, flow);
  938. EXPIRY_REMOVE (EXPIRIES, &ft->expiries, flow->expiry);
  939. expiry_put (ft, flow->expiry);
  940. ft->param.num_flows--;
  941. flow_put (ft, flow);
  942. i++;
  943. }
  944. return (i);
  945. }
  946. /*
  947. * Log our current status.
  948. * Includes summary counters and (in verbose mode) the list of current flows
  949. * and the tree of expiry events.
  950. */
  951. static int
  952. statistics (struct FLOWTRACK *ft, FILE * out, pcap_t * pcap) {
  953. int i;
  954. struct protoent *pe;
  955. char proto[32];
  956. struct pcap_stat ps;
  957. fprintf (out, "Number of active flows: %d\n", ft->param.num_flows);
  958. fprintf (out, "Packets processed: %" PRIu64 "\n", ft->param.total_packets);
  959. if (ft->param.non_sampled_packets)
  960. fprintf (out, "Packets non-sampled: %" PRIu64 "\n",
  961. ft->param.non_sampled_packets);
  962. fprintf (out, "Fragments: %" PRIu64 "\n", ft->param.frag_packets);
  963. fprintf (out,
  964. "Ignored packets: %" PRIu64 " (%" PRIu64 " non-IP, %" PRIu64
  965. " too short)\n", ft->param.non_ip_packets + ft->param.bad_packets,
  966. ft->param.non_ip_packets, ft->param.bad_packets);
  967. fprintf (out, "Flows expired: %" PRIu64 " (%" PRIu64 " forced)\n",
  968. ft->param.flows_expired, ft->param.flows_force_expired);
  969. fprintf (out,
  970. "Flows exported: %" PRIu64 " (%" PRIu64 " records) in %" PRIu64
  971. " packets (%" PRIu64 " failures)\n", ft->param.flows_exported,
  972. ft->param.records_sent, ft->param.packets_sent,
  973. ft->param.flows_dropped);
  974. if (pcap_stats (pcap, &ps) == 0) {
  975. fprintf (out, "Packets received by libpcap: %lu\n",
  976. (unsigned long) ps.ps_recv);
  977. fprintf (out, "Packets dropped by libpcap: %lu\n",
  978. (unsigned long) ps.ps_drop);
  979. fprintf (out, "Packets dropped by interface: %lu\n",
  980. (unsigned long) ps.ps_ifdrop);
  981. }
  982. fprintf (out, "\n");
  983. if (ft->param.flows_expired != 0) {
  984. fprintf (out,
  985. "Expired flow statistics: minimum average maximum\n");
  986. fprintf (out, " Flow bytes: %12.0f %12.0f %12.0f\n",
  987. ft->param.octets.min, ft->param.octets.mean,
  988. ft->param.octets.max);
  989. fprintf (out, " Flow packets: %12.0f %12.0f %12.0f\n",
  990. ft->param.packets.min, ft->param.packets.mean,
  991. ft->param.packets.max);
  992. fprintf (out, " Duration: %12.2fs %12.2fs %12.2fs\n",
  993. ft->param.duration.min, ft->param.duration.mean,
  994. ft->param.duration.max);
  995. fprintf (out, "\n");
  996. fprintf (out, "Expired flow reasons:\n");
  997. fprintf (out, " tcp = %9" PRIu64 " tcp.rst = %9" PRIu64 " "
  998. "tcp.fin = %9" PRIu64 "\n", ft->param.expired_tcp,
  999. ft->param.expired_tcp_rst, ft->param.expired_tcp_fin);
  1000. fprintf (out,
  1001. " udp = %9" PRIu64 " icmp = %9" PRIu64 " "
  1002. "general = %9" PRIu64 "\n", ft->param.expired_udp,
  1003. ft->param.expired_icmp, ft->param.expired_general);
  1004. fprintf (out, " maxlife = %9" PRIu64 "\n", ft->param.expired_maxlife);
  1005. fprintf (out, "over 2 GiB = %9" PRIu64 "\n", ft->param.expired_overbytes);
  1006. fprintf (out, " maxflows = %9" PRIu64 "\n", ft->param.expired_maxflows);
  1007. fprintf (out, " flushed = %9" PRIu64 "\n", ft->param.expired_flush);
  1008. fprintf (out, "\n");
  1009. fprintf (out, "Per-protocol statistics: Octets "
  1010. "Packets Avg Life Max Life\n");
  1011. for (i = 0; i < 256; i++) {
  1012. if (ft->param.packets_pp[i]) {
  1013. pe = getprotobynumber (i);
  1014. snprintf (proto, sizeof (proto), "%s (%d)",
  1015. pe != NULL ? pe->p_name : "Unknown", i);
  1016. fprintf (out, " %17s: %14" PRIu64 " %12" PRIu64 " %8.2fs "
  1017. "%10.2fs\n", proto,
  1018. ft->param.octets_pp[i],
  1019. ft->param.packets_pp[i],
  1020. ft->param.duration_pp[i].mean, ft->param.duration_pp[i].max);
  1021. }
  1022. }
  1023. }
  1024. return (0);
  1025. }
  1026. static void
  1027. dump_flows (struct FLOWTRACK *ft, FILE * out) {
  1028. struct EXPIRY *expiry;
  1029. time_t now;
  1030. now = time (NULL);
  1031. EXPIRY_FOREACH (expiry, EXPIRIES, &ft->expiries) {
  1032. fprintf (out, "ACTIVE %s\n", format_flow (expiry->flow));
  1033. if ((long int) expiry->expires_at - now < 0) {
  1034. fprintf (out,
  1035. "EXPIRY EVENT for flow %" PRIu64 " now%s\n",
  1036. expiry->flow->flow_seq,
  1037. expiry->expires_at == 0 ? " (FORCED)" : "");
  1038. } else {
  1039. fprintf (out,
  1040. "EXPIRY EVENT for flow %" PRIu64 " in %ld seconds\n",
  1041. expiry->flow->flow_seq, (long int) expiry->expires_at - now);
  1042. }
  1043. fprintf (out, "\n");
  1044. }
  1045. }
  1046. /*
  1047. * Figure out how many bytes to skip from front of packet to get past
  1048. * datalink headers. If pkt is specified, also check whether determine
  1049. * whether or not it is one that we are interested in (IPv4 or IPv6 for now)
  1050. *
  1051. * Returns number of bytes to skip or -1 to indicate that entire
  1052. * packet should be skipped
  1053. */
  1054. static int
  1055. datalink_check (int linktype, const u_int8_t * pkt, u_int32_t caplen, int *af,
  1056. struct ether_header **ether, u_int16_t * vlanid,
  1057. u_int8_t * num_label) {
  1058. int i, j;
  1059. u_int32_t frametype;
  1060. int vlan_size = 0;
  1061. static const struct DATALINK *dl = NULL;
  1062. /* Try to cache last used linktype */
  1063. if (dl == NULL || dl->dlt != linktype) {
  1064. for (i = 0; lt[i].dlt != linktype && lt[i].dlt != -1; i++);
  1065. dl = &lt[i];
  1066. }
  1067. if (dl->dlt == -1 || pkt == NULL)
  1068. return (dl->dlt);
  1069. if (caplen <= dl->skiplen)
  1070. return (-1);
  1071. /* Suck out the frametype */
  1072. frametype = 0;
  1073. /* Processing 802.1Q vlan in ethernet */
  1074. if (linktype == DLT_EN10MB) {
  1075. if (ether != NULL)
  1076. *ether = (struct ether_header *) pkt;
  1077. for (j = 0; j < dl->ft_len; j++) {
  1078. frametype <<= 8;
  1079. frametype |= pkt[j + dl->ft_off];
  1080. }
  1081. frametype &= dl->ft_mask;
  1082. if (frametype == ETHERTYPE_VLAN) {
  1083. for (j = 0; j < 2; j++) {
  1084. *vlanid <<= 8;
  1085. *vlanid |= pkt[j + dl->skiplen];
  1086. }
  1087. /*
  1088. * Mask out the PCP and DEI values,
  1089. * leaving just the VID.
  1090. */
  1091. *vlanid &= 0xFFF;
  1092. vlan_size = 4;
  1093. }
  1094. }
  1095. frametype = 0;
  1096. if (dl->ft_is_be) {
  1097. for (j = 0; j < dl->ft_len; j++) {
  1098. frametype <<= 8;
  1099. frametype |= pkt[j + dl->ft_off + vlan_size];
  1100. }
  1101. } else {
  1102. for (j = dl->ft_len - 1; j >= 0; j--) {
  1103. frametype <<= 8;
  1104. frametype |= pkt[j + dl->ft_off + vlan_size];
  1105. }
  1106. }
  1107. frametype &= dl->ft_mask;
  1108. if (frametype == dl->ft_v4)
  1109. *af = AF_INET;
  1110. else if (frametype == dl->ft_v6)
  1111. *af = AF_INET6;
  1112. else if (frametype == ETH_P_MPLS_UC && num_label != NULL) {
  1113. u_int32_t shim = 0;
  1114. u_int8_t ip_version = 0;
  1115. do {
  1116. shim = *((u_int32_t *) (pkt + dl->skiplen + vlan_size) + *num_label);
  1117. *num_label += 1;
  1118. } while (!((ntohl (shim) & MPLS_LS_S_MASK) >> MPLS_LS_S_SHIFT));
  1119. ip_version = (pkt[dl->skiplen + vlan_size + *num_label * 4] & 0xf0) >> 4;
  1120. if (ip_version == 4)
  1121. *af = AF_INET;
  1122. else if (ip_version == 6)
  1123. *af = AF_INET6;
  1124. else
  1125. return (-1);
  1126. } else
  1127. return (-1);
  1128. return (dl->skiplen + vlan_size);
  1129. }
  1130. /*
  1131. * Per-packet callback function from libpcap. Pass the packet (if it is IP)
  1132. * sans datalink headers to process_packet.
  1133. */
  1134. void
  1135. flow_cb (u_char * user_data, const struct pcap_pkthdr *phdr,
  1136. const u_char * pkt) {
  1137. int s, af = 0;
  1138. struct CB_CTXT *cb_ctxt = (struct CB_CTXT *) user_data;
  1139. struct timeval tv;
  1140. u_int16_t vlanid = 0;
  1141. struct ether_header *ether = NULL;
  1142. u_char *mpls_hdr = NULL;
  1143. u_int8_t num_label = 0;
  1144. if (cb_ctxt->ft->param.total_packets == 0) {
  1145. if (cb_ctxt->ft->param.adjust_time) {
  1146. cb_ctxt->ft->param.system_boot_time = phdr->ts;
  1147. }
  1148. }
  1149. if (cb_ctxt->ft->param.option.sample &&
  1150. (cb_ctxt->ft->param.total_packets +
  1151. cb_ctxt->ft->param.non_sampled_packets) %
  1152. cb_ctxt->ft->param.option.sample > 0) {
  1153. cb_ctxt->ft->param.non_sampled_packets++;
  1154. return;
  1155. }
  1156. cb_ctxt->ft->param.total_packets++;
  1157. if (cb_ctxt->ft->param.is_psamp) {
  1158. send_psamp (pkt, phdr->caplen, phdr->ts, cb_ctxt->target,
  1159. cb_ctxt->ft->param.total_packets);
  1160. return;
  1161. }
  1162. s = datalink_check (cb_ctxt->linktype, pkt, phdr->caplen, &af, &ether,
  1163. &vlanid, &num_label);
  1164. if (s < 0 || (!cb_ctxt->want_v6 && af == AF_INET6)) {
  1165. cb_ctxt->ft->param.non_ip_packets++;
  1166. cb_ctxt->ft->param.total_packets--;
  1167. } else {
  1168. tv.tv_sec = phdr->ts.tv_sec;
  1169. tv.tv_usec = phdr->ts.tv_usec;
  1170. if (process_packet (cb_ctxt->ft, pkt + s, af, phdr->caplen - s,
  1171. phdr->len - s, ether, vlanid, &tv,
  1172. num_label) == PP_MALLOC_FAIL)
  1173. cb_ctxt->fatal = 1;
  1174. }
  1175. if (cb_ctxt->ft->param.adjust_time)
  1176. cb_ctxt->ft->param.last_packet_time = phdr->ts;
  1177. }
  1178. #ifdef ENABLE_PTHREAD
  1179. static void
  1180. pcap_memcpy (u_char * user_data, const struct pcap_pkthdr *phdr,
  1181. const u_char * pkt) {
  1182. pthread_mutex_lock (&read_mutex);
  1183. memcpy (&packet_header, phdr, sizeof (struct pcap_pkthdr));
  1184. memcpy (&packet_data, pkt, sizeof (packet_data));
  1185. pthread_mutex_unlock (&read_mutex);
  1186. pthread_cond_signal (&read_cond);
  1187. }
  1188. void *
  1189. process_packet_loop (void *arg) {
  1190. while (!graceful_shutdown_request) {
  1191. pthread_mutex_lock (&read_mutex);
  1192. pthread_cond_wait (&read_cond, &read_mutex);
  1193. if (graceful_shutdown_request)
  1194. break;
  1195. flow_cb ((u_char *) arg, &packet_header, (u_char *) & packet_data);
  1196. pthread_mutex_unlock (&read_mutex);
  1197. }
  1198. }
  1199. #endif /* ENABLE_PTHREAD */
  1200. static void
  1201. print_timeouts (struct FLOWTRACK *ft, FILE * out) {
  1202. fprintf (out, " TCP timeout: %ds\n", ft->param.tcp_timeout);
  1203. fprintf (out, " TCP post-RST timeout: %ds\n", ft->param.tcp_rst_timeout);
  1204. fprintf (out, " TCP post-FIN timeout: %ds\n", ft->param.tcp_fin_timeout);
  1205. fprintf (out, " UDP timeout: %ds\n", ft->param.udp_timeout);
  1206. fprintf (out, " ICMP timeout: %ds\n", ft->param.icmp_timeout);
  1207. fprintf (out, " General timeout: %ds\n", ft->param.general_timeout);
  1208. fprintf (out, " Maximum lifetime: %ds\n", ft->param.maximum_lifetime);
  1209. fprintf (out, " Expiry interval: %ds\n", ft->param.expiry_interval);
  1210. }
  1211. static int
  1212. accept_control (int lsock, struct NETFLOW_TARGET *target,
  1213. struct FLOWTRACK *ft, pcap_t * pcap, int *exit_request,
  1214. int *stop_collection_flag) {
  1215. char buf[64], *p;
  1216. FILE *ctlf;
  1217. int fd, ret;
  1218. if ((fd = accept (lsock, NULL, NULL)) == -1) {
  1219. logit (LOG_ERR, "ctl accept: %s - exiting", strerror (errno));
  1220. return (-1);
  1221. }
  1222. if ((ctlf = fdopen (fd, "r+")) == NULL) {
  1223. logit (LOG_ERR, "fdopen: %s - exiting\n", strerror (errno));
  1224. close (fd);
  1225. return (-1);
  1226. }
  1227. setlinebuf (ctlf);
  1228. if (fgets (buf, sizeof (buf), ctlf) == NULL) {
  1229. logit (LOG_ERR, "Control socket yielded no data");
  1230. return (0);
  1231. }
  1232. if ((p = strchr (buf, '\n')) != NULL)
  1233. *p = '\0';
  1234. if (verbose_flag)
  1235. logit (LOG_DEBUG, "Control socket \"%s\"", buf);
  1236. /* XXX - use dispatch table */
  1237. ret = -1;
  1238. if (strcmp (buf, "help") == 0) {
  1239. fprintf (ctlf, "Valid control words are:\n");
  1240. fprintf (ctlf, "\tdebug+ debug- delete-all dump-flows exit "
  1241. "expire-all\n");
  1242. fprintf (ctlf, "\tshutdown start-gather statistics stop-gather "
  1243. "timeouts\n");
  1244. fprintf (ctlf, "\tsend-template\n");
  1245. ret = 0;
  1246. } else if (strcmp (buf, "shutdown") == 0) {
  1247. fprintf (ctlf, "softflowd[%u]: Shutting down gracefully...\n",
  1248. (unsigned int) getpid ());
  1249. graceful_shutdown_request = 1;
  1250. ret = 1;
  1251. } else if (strcmp (buf, "exit") == 0) {
  1252. fprintf (ctlf, "softflowd[%u]: Exiting now...\n",
  1253. (unsigned int) getpid ());
  1254. *exit_request = 1;
  1255. ret = 1;
  1256. } else if (strcmp (buf, "expire-all") == 0) {
  1257. #ifdef ENABLE_LEGACY
  1258. netflow9_resend_template ();
  1259. #else /* ENABLE_LEGACY */
  1260. ipfix_resend_template ();
  1261. #endif /* ENABLE_LEGACY */
  1262. fprintf (ctlf, "softflowd[%u]: Expired %d flows.\n",
  1263. (unsigned int) getpid (), check_expired (ft, target,
  1264. CE_EXPIRE_ALL));
  1265. ret = 0;
  1266. } else if (strcmp (buf, "send-template") == 0) {
  1267. #ifdef ENABLE_LEGACY
  1268. netflow9_resend_template ();
  1269. #else /* ENABLE_LEGACY */
  1270. ipfix_resend_template ();
  1271. #endif /* ENABLE_LEGACY */
  1272. fprintf (ctlf, "softflowd[%u]: Template will be sent at "
  1273. "next flow export\n", (unsigned int) getpid ());
  1274. ret = 0;
  1275. } else if (strcmp (buf, "delete-all") == 0) {
  1276. fprintf (ctlf, "softflowd[%u]: Deleted %d flows.\n",
  1277. (unsigned int) getpid (), delete_all_flows (ft));
  1278. ret = 0;
  1279. } else if (strcmp (buf, "statistics") == 0) {
  1280. fprintf (ctlf, "softflowd[%u]: Accumulated statistics "
  1281. "since %s UTC:\n", (unsigned int) getpid (),
  1282. format_time (ft->param.system_boot_time.tv_sec));
  1283. statistics (ft, ctlf, pcap);
  1284. ret = 0;
  1285. } else if (strcmp (buf, "debug+") == 0) {
  1286. fprintf (ctlf, "softflowd[%u]: Debug level increased.\n",
  1287. (unsigned int) getpid ());
  1288. verbose_flag = 1;
  1289. ret = 0;
  1290. } else if (strcmp (buf, "debug-") == 0) {
  1291. fprintf (ctlf, "softflowd[%u]: Debug level decreased.\n",
  1292. (unsigned int) getpid ());
  1293. verbose_flag = 0;
  1294. ret = 0;
  1295. } else if (strcmp (buf, "stop-gather") == 0) {
  1296. fprintf (ctlf, "softflowd[%u]: Data collection stopped.\n",
  1297. (unsigned int) getpid ());
  1298. *stop_collection_flag = 1;
  1299. ret = 0;
  1300. } else if (strcmp (buf, "start-gather") == 0) {
  1301. fprintf (ctlf, "softflowd[%u]: Data collection resumed.\n",
  1302. (unsigned int) getpid ());
  1303. *stop_collection_flag = 0;
  1304. ret = 0;
  1305. } else if (strcmp (buf, "dump-flows") == 0) {
  1306. fprintf (ctlf, "softflowd[%u]: Dumping flow data:\n",
  1307. (unsigned int) getpid ());
  1308. dump_flows (ft, ctlf);
  1309. ret = 0;
  1310. } else if (strcmp (buf, "timeouts") == 0) {
  1311. fprintf (ctlf, "softflowd[%u]: Printing timeouts:\n",
  1312. (unsigned int) getpid ());
  1313. print_timeouts (ft, ctlf);
  1314. ret = 0;
  1315. } else {
  1316. fprintf (ctlf, "Unknown control command \"%s\"\n", buf);
  1317. ret = 0;
  1318. }
  1319. fclose (ctlf);
  1320. close (fd);
  1321. return (ret);
  1322. }
  1323. static int
  1324. recvsock (uint16_t portnumber) {
  1325. struct sockaddr_in addr;
  1326. int rsock = socket (AF_INET, SOCK_DGRAM, 0);
  1327. if (rsock < 0) {
  1328. perror ("socket");
  1329. return rsock;
  1330. }
  1331. addr.sin_family = AF_INET;
  1332. addr.sin_port = htons (portnumber);
  1333. addr.sin_addr.s_addr = INADDR_ANY;
  1334. if (bind (rsock, (struct sockaddr *) &addr, sizeof (addr)) < 0) {
  1335. perror ("bind");
  1336. return -1;
  1337. };
  1338. return rsock;
  1339. }
  1340. #ifdef LINUX
  1341. static void
  1342. bind_device (int sock, char *ifname) {
  1343. struct ifreq ifr;
  1344. memset (&ifr, 0, sizeof (ifr));
  1345. strncpy (ifr.ifr_name, ifname, sizeof (ifr.ifr_name));
  1346. if (setsockopt
  1347. (sock, SOL_SOCKET, SO_BINDTODEVICE, (void *) &ifr, sizeof (ifr)) < 0) {
  1348. perror ("SO_BINDTODEVICE failed");
  1349. }
  1350. }
  1351. #endif /* LINUX */
  1352. static int
  1353. connsock (struct sockaddr_storage *addr, socklen_t len, int hoplimit,
  1354. int protocol) {
  1355. int s;
  1356. unsigned int h6;
  1357. unsigned char h4;
  1358. struct sockaddr_in *in4 = (struct sockaddr_in *) addr;
  1359. struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) addr;
  1360. if ((s =
  1361. socket (addr->ss_family,
  1362. protocol == IPPROTO_UDP ? SOCK_DGRAM : SOCK_STREAM,
  1363. protocol)) == -1) {
  1364. fprintf (stderr, "socket() error: %s\n", strerror (errno));
  1365. exit (1);
  1366. }
  1367. if (connect (s, (struct sockaddr *) addr, len) == -1) {
  1368. fprintf (stderr, "connect() error: %s\n", strerror (errno));
  1369. exit (1);
  1370. }
  1371. switch (addr->ss_family) {
  1372. case AF_INET:
  1373. /* Default to link-local TTL for multicast addresses */
  1374. if (hoplimit == -1 && IN_MULTICAST (in4->sin_addr.s_addr))
  1375. hoplimit = 1;
  1376. if (hoplimit == -1)
  1377. break;
  1378. h4 = hoplimit;
  1379. if (setsockopt (s, IPPROTO_IP, IP_MULTICAST_TTL, &h4, sizeof (h4)) == -1) {
  1380. fprintf (stderr, "setsockopt(IP_MULTICAST_TTL, "
  1381. "%u): %s\n", h4, strerror (errno));
  1382. exit (1);
  1383. }
  1384. break;
  1385. case AF_INET6:
  1386. /* Default to link-local hoplimit for multicast addresses */
  1387. if (hoplimit == -1 && IN6_IS_ADDR_MULTICAST (&in6->sin6_addr))
  1388. hoplimit = 1;
  1389. if (hoplimit == -1)
  1390. break;
  1391. h6 = hoplimit;
  1392. if (setsockopt (s, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
  1393. &h6, sizeof (h6)) == -1) {
  1394. fprintf (stderr, "setsockopt(IPV6_MULTICAST_HOPS, %u): "
  1395. "%s\n", h6, strerror (errno));
  1396. exit (1);
  1397. }
  1398. }
  1399. return (s);
  1400. }
  1401. static int
  1402. unix_listener (const char *path) {
  1403. struct sockaddr_un addr;
  1404. socklen_t addrlen;
  1405. int s;
  1406. memset (&addr, '\0', sizeof (addr));
  1407. addr.sun_family = AF_UNIX;
  1408. if (strlcpy (addr.sun_path, path, sizeof (addr.sun_path)) >=
  1409. sizeof (addr.sun_path)) {
  1410. fprintf (stderr, "control socket path too long\n");
  1411. exit (1);
  1412. }
  1413. addr.sun_path[sizeof (addr.sun_path) - 1] = '\0';
  1414. addrlen = offsetof (struct sockaddr_un, sun_path) + strlen (path) + 1;
  1415. #ifdef SOCK_HAS_LEN
  1416. addr.sun_len = addrlen;
  1417. #endif
  1418. if ((s = socket (PF_UNIX, SOCK_STREAM, 0)) < 0) {
  1419. fprintf (stderr, "unix domain socket() error: %s\n", strerror (errno));
  1420. exit (1);
  1421. }
  1422. unlink (path);
  1423. if (bind (s, (struct sockaddr *) &addr, addrlen) == -1) {
  1424. fprintf (stderr, "unix domain bind(\"%s\") error: %s\n",
  1425. addr.sun_path, strerror (errno));
  1426. exit (1);
  1427. }
  1428. if (listen (s, 64) == -1) {
  1429. fprintf (stderr, "unix domain listen() error: %s\n", strerror (errno));
  1430. exit (1);
  1431. }
  1432. return (s);
  1433. }
  1434. static void
  1435. setup_packet_capture (struct pcap **pcap, int *linktype,
  1436. char *dev, char *capfile, char *bpf_prog, int need_v6,
  1437. int promisc, int buffer_size_override) {
  1438. char ebuf[PCAP_ERRBUF_SIZE];
  1439. struct bpf_program prog_c;
  1440. u_int32_t bpf_mask, bpf_net;
  1441. int res;
  1442. /* Open pcap */
  1443. if (dev != NULL) {
  1444. if (!snaplen)
  1445. snaplen = need_v6 ? LIBPCAP_SNAPLEN_V6 : LIBPCAP_SNAPLEN_V4;
  1446. if ((*pcap = pcap_create (dev, ebuf)) == NULL) {
  1447. fprintf (stderr, "pcap_create: %s\n", ebuf);
  1448. exit (1);
  1449. }
  1450. if ((res = pcap_set_snaplen (*pcap, snaplen)) != 0) {
  1451. fprintf (stderr, "pcap_set_snaplen: %s\n", pcap_geterr (*pcap));
  1452. exit (1);
  1453. }
  1454. if ((res = pcap_set_promisc (*pcap, promisc)) != 0) {
  1455. fprintf (stderr, "pcap_set_promisc: %s\n", pcap_geterr (*pcap));
  1456. exit (1);
  1457. }
  1458. if ((res = pcap_set_timeout (*pcap, 0)) != 0) {
  1459. fprintf (stderr, "pcap_set_timeout: %s\n", pcap_geterr (*pcap));
  1460. exit (1);
  1461. }
  1462. if (buffer_size_override > 0)
  1463. if ((res = pcap_set_buffer_size (*pcap, buffer_size_override)) != 0) {
  1464. fprintf (stderr, "pcap_set_buffer_size: %s\n", pcap_geterr (*pcap));
  1465. exit (1);
  1466. }
  1467. if (pcap_lookupnet (dev, &bpf_net, &bpf_mask, ebuf) == -1)
  1468. bpf_net = bpf_mask = 0;
  1469. if ((res = pcap_activate (*pcap)) != 0) {
  1470. fprintf (stderr, "pcap_activate: %s\n", pcap_geterr (*pcap));
  1471. exit (1);
  1472. }
  1473. } else {
  1474. if ((*pcap = pcap_open_offline (capfile, ebuf)) == NULL) {
  1475. fprintf (stderr, "pcap_open_offline(%s): %s\n", capfile, ebuf);
  1476. exit (1);
  1477. }
  1478. bpf_net = bpf_mask = 0;
  1479. }
  1480. *linktype = pcap_datalink (*pcap);
  1481. if (datalink_check (*linktype, NULL, 0, NULL, NULL, NULL, NULL) == -1) {
  1482. fprintf (stderr, "Unsupported datalink type %d\n", *linktype);
  1483. exit (1);
  1484. }
  1485. /* Attach BPF filter, if specified */
  1486. if (bpf_prog != NULL) {
  1487. if (pcap_compile (*pcap, &prog_c, bpf_prog, 1, bpf_mask) == -1) {
  1488. fprintf (stderr, "pcap_compile(\"%s\"): %s\n",
  1489. bpf_prog, pcap_geterr (*pcap));
  1490. exit (1);
  1491. }
  1492. if (pcap_setfilter (*pcap, &prog_c) == -1) {
  1493. fprintf (stderr, "pcap_setfilter: %s\n", pcap_geterr (*pcap));
  1494. exit (1);
  1495. }
  1496. }
  1497. #ifdef BIOCLOCK
  1498. /*
  1499. * If we are reading from an device (not a file), then
  1500. * lock the underlying BPF device to prevent changes in the
  1501. * unprivileged child
  1502. */
  1503. if (dev != NULL && ioctl (pcap_fileno (*pcap), BIOCLOCK) < 0) {
  1504. fprintf (stderr, "ioctl(BIOCLOCK) failed: %s\n", strerror (errno));
  1505. exit (1);
  1506. }
  1507. #endif
  1508. }
  1509. static void
  1510. init_flowtrack (struct FLOWTRACK *ft) {
  1511. /* Set up flow-tracking structure */
  1512. memset (ft, '\0', sizeof (*ft));
  1513. ft->param.next_flow_seq = 1;
  1514. FLOW_INIT (&ft->flows);
  1515. EXPIRY_INIT (&ft->expiries);
  1516. freelist_init (&ft->flow_freelist, sizeof (struct FLOW));
  1517. freelist_init (&ft->expiry_freelist, sizeof (struct EXPIRY));
  1518. ft->param.max_flows = DEFAULT_MAX_FLOWS;
  1519. track_level = ft->param.track_level = TRACK_FULL;
  1520. ft->param.tcp_timeout = DEFAULT_TCP_TIMEOUT;
  1521. ft->param.tcp_rst_timeout = DEFAULT_TCP_RST_TIMEOUT;
  1522. ft->param.tcp_fin_timeout = DEFAULT_TCP_FIN_TIMEOUT;
  1523. ft->param.udp_timeout = DEFAULT_UDP_TIMEOUT;
  1524. ft->param.icmp_timeout = DEFAULT_ICMP_TIMEOUT;
  1525. ft->param.general_timeout = DEFAULT_GENERAL_TIMEOUT;
  1526. ft->param.maximum_lifetime = DEFAULT_MAXIMUM_LIFETIME;
  1527. ft->param.expiry_interval = DEFAULT_EXPIRY_INTERVAL;
  1528. }
  1529. static char *
  1530. argv_join (int argc, char **argv) {
  1531. int i;
  1532. size_t ret_len;
  1533. char *ret;
  1534. ret_len = 0;
  1535. ret = NULL;
  1536. for (i = 0; i < argc; i++) {
  1537. ret_len += strlen (argv[i]);
  1538. if ((ret = realloc (ret, ret_len + 2)) == NULL) {
  1539. fprintf (stderr, "Memory allocation failed.\n");
  1540. exit (1);
  1541. }
  1542. if (i == 0)
  1543. ret[0] = '\0';
  1544. else {
  1545. ret_len++; /* Make room for ' ' */
  1546. strlcat (ret, " ", ret_len + 1);
  1547. }
  1548. strlcat (ret, argv[i], ret_len + 1);
  1549. }
  1550. return (ret);
  1551. }
  1552. /* Display commandline usage information */
  1553. static void
  1554. usage (void) {
  1555. fprintf (stderr,
  1556. "Usage: %s [options] [bpf_program]\n"
  1557. "This is %s version %s. Valid commandline options:\n"
  1558. " -i [idx:]interface Specify interface to listen on\n"
  1559. " -r pcap_file Specify packet capture file to read\n"
  1560. " -t timeout=time Specify named timeout\n"
  1561. " -m max_flows Specify maximum number of flows to track (default %d)\n"
  1562. " -n host:port Send Cisco NetFlow(tm)-compatible packets to host:port\n"
  1563. " -p pidfile Record pid in specified file\n"
  1564. " (default: %s)\n"
  1565. " -c socketfile Location of control socket\n"
  1566. " (default: %s)\n"
  1567. " -v 1|5|9|10|psamp NetFlow export packet version\n"
  1568. " 10 means IPFIX and psamp means PSAMP (packet sampling)\n"
  1569. #ifdef ENABLE_NTOPNG
  1570. " ntopng ntopng means direct injection to NTOPNG (if supported).\n"
  1571. #endif
  1572. " -L hoplimit Set TTL/hoplimit for export datagrams\n"
  1573. " -T full|port|proto|ip| Set flow tracking level (default: full)\n"
  1574. " vlan (\"vlan\" tracking means \"full\" tracking with vlanid)\n"
  1575. " ether (\"ether\" tracking means \"vlan\" tracking with ether header)\n"
  1576. " -6 Track IPv6 flows, regardless of whether selected \n"
  1577. " NetFlow export protocol supports it\n"
  1578. " -d Don't daemonise (run in foreground)\n"
  1579. " -D Debug mode: foreground + verbosity + track v6 flows\n"
  1580. " -P udp|tcp|sctp Specify transport layer protocol for exporting packets\n"
  1581. " -A sec|milli|micro|nano Specify absolute time format form exporting records\n"
  1582. " -s sampling_rate Specify periodical sampling rate (denominator)\n"
  1583. " -B bytes Libpcap buffer size in bytes\n"
  1584. " -b Bidirectional mode in IPFIX (-b work with -v 10)\n"
  1585. " -a Adjusting time for reading pcap file (-a work with -r)\n"
  1586. " -C capture_length Specify length for packet capture (snaplen)\n"
  1587. " -l Load balancing mode for multiple destinations\n"
  1588. " -R receive_port Specify port number for PSAMP receive mode\n"
  1589. #ifdef ENABLE_PTHREAD
  1590. " -M Enable multithread\n"
  1591. #endif /* ENABLE_PTHREAD */
  1592. " -N Disable promiscuous mode\n"
  1593. #ifdef LINUX
  1594. " -S send_interface_name Specify send interface name\n"
  1595. #endif /* LINUX */
  1596. " -x Specify number of MPLS labels\n"
  1597. " -h Display this help\n"
  1598. "\n"
  1599. "Valid timeout names and default values:\n"
  1600. " tcp (default %6d)"
  1601. " tcp.rst (default %6d)"
  1602. " tcp.fin (default %6d)\n"
  1603. " udp (default %6d)"
  1604. " icmp (default %6d)"
  1605. " general (default %6d)\n"
  1606. " maxlife (default %6d)"
  1607. " expint (default %6d)\n"
  1608. "\n",
  1609. PROGNAME, PROGNAME, PROGVER, DEFAULT_MAX_FLOWS, DEFAULT_PIDFILE,
  1610. DEFAULT_CTLSOCK,
  1611. DEFAULT_TCP_TIMEOUT, DEFAULT_TCP_RST_TIMEOUT,
  1612. DEFAULT_TCP_FIN_TIMEOUT, DEFAULT_UDP_TIMEOUT, DEFAULT_ICMP_TIMEOUT,
  1613. DEFAULT_GENERAL_TIMEOUT, DEFAULT_MAXIMUM_LIFETIME,
  1614. DEFAULT_EXPIRY_INTERVAL);
  1615. }
  1616. static void
  1617. set_timeout (struct FLOWTRACK *ft, const char *to_spec) {
  1618. char *name, *value;
  1619. int timeout;
  1620. if ((name = strdup (to_spec)) == NULL) {
  1621. fprintf (stderr, "Out of memory\n");
  1622. exit (1);
  1623. }
  1624. if ((value = strchr (name, '=')) == NULL || *(++value) == '\0') {
  1625. fprintf (stderr, "Invalid -t option \"%s\".\n", name);
  1626. usage ();
  1627. exit (1);
  1628. }
  1629. *(value - 1) = '\0';
  1630. timeout = convtime (value);
  1631. if (timeout < 0) {
  1632. fprintf (stderr, "Invalid -t timeout.\n");
  1633. usage ();
  1634. exit (1);
  1635. }
  1636. if (strcmp (name, "tcp") == 0)
  1637. ft->param.tcp_timeout = timeout;
  1638. else if (strcmp (name, "tcp.rst") == 0)
  1639. ft->param.tcp_rst_timeout = timeout;
  1640. else if (strcmp (name, "tcp.fin") == 0)
  1641. ft->param.tcp_fin_timeout = timeout;
  1642. else if (strcmp (name, "udp") == 0)
  1643. ft->param.udp_timeout = timeout;
  1644. else if (strcmp (name, "icmp") == 0)
  1645. ft->param.icmp_timeout = timeout;
  1646. else if (strcmp (name, "general") == 0)
  1647. ft->param.general_timeout = timeout;
  1648. else if (strcmp (name, "maxlife") == 0)
  1649. ft->param.maximum_lifetime = timeout;
  1650. else if (strcmp (name, "expint") == 0)
  1651. ft->param.expiry_interval = timeout;
  1652. else {
  1653. fprintf (stderr, "Invalid -t name.\n");
  1654. usage ();
  1655. exit (1);
  1656. }
  1657. if (ft->param.general_timeout == 0) {
  1658. fprintf (stderr, "\"general\" flow timeout must be "
  1659. "greater than zero\n");
  1660. exit (1);
  1661. }
  1662. free (name);
  1663. }
  1664. static void
  1665. parse_hostport (const char *s, struct sockaddr *addr, socklen_t * len) {
  1666. char *orig, *host, *port;
  1667. struct addrinfo hints, *res;
  1668. int herr;
  1669. if ((host = orig = strdup (s)) == NULL) {
  1670. fprintf (stderr, "Out of memory\n");
  1671. exit (1);
  1672. }
  1673. if ((port = strrchr (host, ':')) == NULL ||
  1674. *(++port) == '\0' || *host == '\0') {
  1675. fprintf (stderr, "Invalid -n argument.\n");
  1676. usage ();
  1677. exit (1);
  1678. }
  1679. *(port - 1) = '\0';
  1680. /* Accept [host]:port for numeric IPv6 addresses */
  1681. if (*host == '[' && *(port - 2) == ']') {
  1682. host++;
  1683. *(port - 2) = '\0';
  1684. }
  1685. memset (&hints, '\0', sizeof (hints));
  1686. hints.ai_socktype = SOCK_DGRAM;
  1687. if ((herr = getaddrinfo (host, port, &hints, &res)) == -1) {
  1688. fprintf (stderr, "Address lookup failed: %s\n", gai_strerror (herr));
  1689. exit (1);
  1690. }
  1691. if (res == NULL || res->ai_addr == NULL) {
  1692. fprintf (stderr, "No addresses found for [%s]:%s\n", host, port);
  1693. exit (1);
  1694. }
  1695. if (res->ai_addrlen > *len) {
  1696. fprintf (stderr, "Address too long\n");
  1697. exit (1);
  1698. }
  1699. memcpy (addr, res->ai_addr, res->ai_addrlen);
  1700. free (orig);
  1701. *len = res->ai_addrlen;
  1702. }
  1703. static int
  1704. parse_hostports (const char *s, struct DESTINATION *dest, int max_dest) {
  1705. int i = 0;
  1706. char *hostport;
  1707. for (hostport = strsep ((char **) &s, ",");
  1708. hostport != NULL && i < max_dest;
  1709. hostport = strsep ((char **) &s, ",")) {
  1710. dest[i].sslen = sizeof (dest[i].ss);
  1711. parse_hostport (hostport, (struct sockaddr *) &dest[i].ss,
  1712. &dest[i].sslen);
  1713. i++;
  1714. }
  1715. return i;
  1716. }
  1717. /*
  1718. * Drop privileges and chroot, will exit on failure
  1719. */
  1720. static void
  1721. drop_privs (void) {
  1722. struct passwd *pw;
  1723. if ((pw = getpwnam (PRIVDROP_USER)) == NULL) {
  1724. logit (LOG_ERR, "Unable to find unprivileged user \"%s\"", PRIVDROP_USER);
  1725. exit (1);
  1726. }
  1727. if (chdir (PRIVDROP_CHROOT_DIR) != 0) {
  1728. logit (LOG_ERR, "Unable to chdir to chroot directory \"%s\": %s",
  1729. PRIVDROP_CHROOT_DIR, strerror (errno));
  1730. exit (1);
  1731. }
  1732. if (chroot (PRIVDROP_CHROOT_DIR) != 0) {
  1733. logit (LOG_ERR, "Unable to chroot to directory \"%s\": %s",
  1734. PRIVDROP_CHROOT_DIR, strerror (errno));
  1735. exit (1);
  1736. }
  1737. if (chdir ("/") != 0) {
  1738. logit (LOG_ERR, "Unable to chdir to chroot root: %s", strerror (errno));
  1739. exit (1);
  1740. }
  1741. if (setgroups (1, &pw->pw_gid) != 0) {
  1742. logit (LOG_ERR, "Couldn't setgroups (%u): %s",
  1743. (unsigned int) pw->pw_gid, strerror (errno));
  1744. exit (1);
  1745. }
  1746. #if defined(HAVE_SETRESGID)
  1747. if (setresgid (pw->pw_gid, pw->pw_gid, pw->pw_gid) == -1)
  1748. #elif defined(HAVE_SETREGID)
  1749. if (setregid (pw->pw_gid, pw->pw_gid) == -1)
  1750. #else
  1751. if (setegid (pw->pw_gid) == -1 || setgid (pw->pw_gid) == -1)
  1752. #endif
  1753. {
  1754. logit (LOG_ERR, "Couldn't set gid (%u): %s",
  1755. (unsigned int) pw->pw_gid, strerror (errno));
  1756. exit (1);
  1757. }
  1758. #if defined(HAVE_SETRESUID)
  1759. if (setresuid (pw->pw_uid, pw->pw_uid, pw->pw_uid) == -1)
  1760. #elif defined(HAVE_SETREUID)
  1761. if (setreuid (pw->pw_uid, pw->pw_uid) == -1)
  1762. #else
  1763. if (seteuid (pw->pw_uid) == -1 || setuid (pw->pw_uid) == -1)
  1764. #endif
  1765. {
  1766. logit (LOG_ERR, "Couldn't set uid (%u): %s",
  1767. (unsigned int) pw->pw_uid, strerror (errno));
  1768. exit (1);
  1769. }
  1770. }
  1771. int
  1772. main (int argc, char **argv) {
  1773. char *dev, *capfile, *bpf_prog;
  1774. const char *pidfile_path, *ctlsock_path;
  1775. extern char *optarg;
  1776. extern int optind;
  1777. int ch, dontfork_flag, linktype, ctlsock, err, always_v6, r, dest_idx;
  1778. int stop_collection_flag, exit_request, hoplimit;
  1779. pcap_t *pcap = NULL;
  1780. struct FLOWTRACK flowtrack;
  1781. struct NETFLOW_TARGET target;
  1782. struct CB_CTXT cb_ctxt;
  1783. struct pollfd pl[2];
  1784. struct DESTINATION *dest;
  1785. int pcap_override_buffer_size = 0;
  1786. int protocol = IPPROTO_UDP;
  1787. int version = 0;
  1788. int rsock = 0, recvport = IPFIX_PORT, recvloop = 0;
  1789. #ifdef LINUX
  1790. char *send_ifname;
  1791. #endif /* LINUX */
  1792. #ifdef ENABLE_PTHREAD
  1793. int use_thread = 0;
  1794. pthread_t read_thread = 0;
  1795. pthread_mutex_init (&read_mutex, NULL);
  1796. pthread_cond_init (&read_cond, NULL);
  1797. #endif /* ENABLE_PTHREAD */
  1798. int use_promisc = 1;
  1799. closefrom (STDERR_FILENO + 1);
  1800. init_flowtrack (&flowtrack);
  1801. memset (&target, '\0', sizeof (target));
  1802. target.dialect = &nf[0];
  1803. hoplimit = -1;
  1804. bpf_prog = NULL;
  1805. ctlsock = -1;
  1806. dev = capfile = NULL;
  1807. pidfile_path = DEFAULT_PIDFILE;
  1808. ctlsock_path = DEFAULT_CTLSOCK;
  1809. dontfork_flag = 0;
  1810. always_v6 = 0;
  1811. while ((ch =
  1812. getopt (argc, argv,
  1813. "6hdDL:T:i:r:f:t:n:m:p:c:v:s:P:A:B:baC:lR:MNS:x:")) != -1) {
  1814. switch (ch) {
  1815. case '6':
  1816. always_v6 = 1;
  1817. break;
  1818. case 'h':
  1819. usage ();
  1820. return (0);
  1821. case 'D':
  1822. verbose_flag = 1;
  1823. always_v6 = 1;
  1824. /* FALLTHROUGH */
  1825. case 'd':
  1826. dontfork_flag = 1;
  1827. break;
  1828. case 'i':
  1829. if (capfile != NULL || dev != NULL) {
  1830. fprintf (stderr, "Packet source already " "specified.\n\n");
  1831. usage ();
  1832. exit (1);
  1833. }
  1834. #if defined(HAVE_STRSEP)
  1835. dev = strsep (&optarg, ":");
  1836. #else /* defined(HAVE_STRSEP) */
  1837. dev = strtok (optarg, ":");
  1838. #endif /* defined(HAVE_STRSEP) */
  1839. if (optarg != NULL) {
  1840. if (strlen (dev) > 0) {
  1841. if_index = (u_int16_t) atoi (dev);
  1842. }
  1843. dev = optarg;
  1844. }
  1845. if (strlen (dev) == 0) {
  1846. fprintf (stderr, "Wrong interface is specified.\n\n");
  1847. usage ();
  1848. exit (1);
  1849. }
  1850. if (verbose_flag)
  1851. fprintf (stderr, "Using %s (idx: %d)\n", dev, if_index);
  1852. strncpy (flowtrack.param.option.interfaceName, dev,
  1853. strlen (dev) <
  1854. sizeof (flowtrack.param.option.interfaceName) ?
  1855. strlen (dev) : sizeof (flowtrack.param.option.interfaceName));
  1856. break;
  1857. case 'r':
  1858. if (capfile != NULL || dev != NULL) {
  1859. fprintf (stderr, "Packet source already " "specified.\n\n");
  1860. usage ();
  1861. exit (1);
  1862. }
  1863. capfile = optarg;
  1864. dontfork_flag = 1;
  1865. ctlsock_path = NULL;
  1866. strncpy (flowtrack.param.option.interfaceName, capfile,
  1867. strlen (capfile) <
  1868. sizeof (flowtrack.param.option.interfaceName) ?
  1869. strlen (capfile) :
  1870. sizeof (flowtrack.param.option.interfaceName));
  1871. break;
  1872. case 't':
  1873. /* Will exit on failure */
  1874. set_timeout (&flowtrack, optarg);
  1875. break;
  1876. case 'T':
  1877. if (strcasecmp (optarg, "full") == 0)
  1878. flowtrack.param.track_level = TRACK_FULL;
  1879. else if (strcasecmp (optarg, "port") == 0)
  1880. flowtrack.param.track_level = TRACK_IP_PROTO_PORT;
  1881. else if (strcasecmp (optarg, "proto") == 0)
  1882. flowtrack.param.track_level = TRACK_IP_PROTO;
  1883. else if (strcasecmp (optarg, "ip") == 0)
  1884. flowtrack.param.track_level = TRACK_IP_ONLY;
  1885. else if (strcasecmp (optarg, "vlan") == 0)
  1886. flowtrack.param.track_level = TRACK_FULL_VLAN;
  1887. else if (strcasecmp (optarg, "ether") == 0)
  1888. flowtrack.param.track_level = TRACK_FULL_VLAN_ETHER;
  1889. else {
  1890. fprintf (stderr, "Unknown flow tracking " "level\n");
  1891. usage ();
  1892. exit (1);
  1893. }
  1894. track_level = flowtrack.param.track_level;
  1895. break;
  1896. case 'L':
  1897. hoplimit = atoi (optarg);
  1898. if (hoplimit < 0 || hoplimit > 255) {
  1899. fprintf (stderr, "Invalid hop limit\n\n");
  1900. usage ();
  1901. exit (1);
  1902. }
  1903. break;
  1904. case 'm':
  1905. if ((flowtrack.param.max_flows = atoi (optarg)) < 0) {
  1906. fprintf (stderr, "Invalid maximum flows\n\n");
  1907. usage ();
  1908. exit (1);
  1909. }
  1910. break;
  1911. case 'n':
  1912. /* Will exit on failure */
  1913. target.num_destinations =
  1914. parse_hostports (optarg, target.destinations,
  1915. SOFTFLOWD_MAX_DESTINATIONS);
  1916. break;
  1917. case 'p':
  1918. pidfile_path = optarg;
  1919. break;
  1920. case 'c':
  1921. if (strcmp (optarg, "none") == 0)
  1922. ctlsock_path = NULL;
  1923. else
  1924. ctlsock_path = optarg;
  1925. break;
  1926. case 'v':
  1927. if (!strncmp (optarg, "psamp", sizeof ("psamp"))) {
  1928. flowtrack.param.is_psamp = 1;
  1929. break;
  1930. }
  1931. #ifdef ENABLE_NTOPNG
  1932. if (!strncmp (optarg, SOFTFLOWD_NF_VERSION_NTOPNG_STRING,
  1933. sizeof (SOFTFLOWD_NF_VERSION_NTOPNG_STRING))) {
  1934. version = SOFTFLOWD_NF_VERSION_NTOPNG;
  1935. }
  1936. #endif /* ENABLE_NTOPNG */
  1937. version = version ? version : atoi (optarg);
  1938. target.dialect = lookup_netflow_sender (version);
  1939. if (target.dialect == NULL) {
  1940. fprintf (stderr, "Invalid NetFlow version\n");
  1941. exit (1);
  1942. }
  1943. break;
  1944. case 's':
  1945. flowtrack.param.option.sample = atoi (optarg);
  1946. if (flowtrack.param.option.sample < 2) {
  1947. flowtrack.param.option.sample = 0;
  1948. }
  1949. break;
  1950. case 'B':
  1951. pcap_override_buffer_size = atoi (optarg);
  1952. break;
  1953. case 'P':
  1954. if (strcasecmp (optarg, "udp") == 0)
  1955. protocol = IPPROTO_UDP;
  1956. else if (strcasecmp (optarg, "tcp") == 0)
  1957. protocol = IPPROTO_TCP;
  1958. #ifdef IPPROTO_SCTP
  1959. else if (strcasecmp (optarg, "sctp") == 0)
  1960. protocol = IPPROTO_SCTP;
  1961. #endif
  1962. else {
  1963. fprintf (stderr, "Unknown transport layer protocol" "\n");
  1964. usage ();
  1965. exit (1);
  1966. }
  1967. break;
  1968. case 'A':
  1969. if (strcasecmp (optarg, "sec") == 0)
  1970. flowtrack.param.time_format = 's';
  1971. else if (strcasecmp (optarg, "milli") == 0)
  1972. flowtrack.param.time_format = 'm';
  1973. else if (strcasecmp (optarg, "micro") == 0)
  1974. flowtrack.param.time_format = 'M';
  1975. else if (strcasecmp (optarg, "nano") == 0)
  1976. flowtrack.param.time_format = 'n';
  1977. else {
  1978. fprintf (stderr, "Unknown time format" "\n");
  1979. usage ();
  1980. exit (1);
  1981. }
  1982. break;
  1983. case 'b':
  1984. flowtrack.param.bidirection = 1;
  1985. break;
  1986. case 'a':
  1987. flowtrack.param.adjust_time = 1;
  1988. break;
  1989. case 'C': /* Capture Length */
  1990. snaplen = atoi (optarg);
  1991. break;
  1992. case 'l': // load balancing
  1993. target.is_loadbalance = 1;
  1994. break;
  1995. case 'R':
  1996. recvport = atoi (optarg);
  1997. if (recvport < 0 && recvport > 65535)
  1998. recvport = IPFIX_PORT;
  1999. rsock = recvsock ((uint16_t) recvport);
  2000. break;
  2001. case 'M':
  2002. #ifdef ENABLE_PTHREAD
  2003. use_thread = 1;
  2004. #endif /* ENABLE_PTHREAD */
  2005. break;
  2006. case 'N':
  2007. use_promisc = 0;
  2008. break;
  2009. #ifdef LINUX
  2010. case 'S':
  2011. send_ifname = optarg;
  2012. break;
  2013. #endif /* LINUX */
  2014. case 'x':
  2015. flowtrack.param.max_num_label = atoi (optarg);
  2016. if (flowtrack.param.max_num_label < 0
  2017. || flowtrack.param.max_num_label > 10) {
  2018. fprintf (stderr, "Invalid number of MPLS label\n\n");
  2019. usage ();
  2020. exit (1);
  2021. }
  2022. break;
  2023. default:
  2024. fprintf (stderr, "Invalid commandline option.\n");
  2025. usage ();
  2026. exit (1);
  2027. }
  2028. }
  2029. if (capfile == NULL && dev == NULL && rsock <= 0) {
  2030. fprintf (stderr, "-i, -r or -R option not specified.\n");
  2031. usage ();
  2032. exit (1);
  2033. }
  2034. /* join remaining arguments (if any) into bpf program */
  2035. bpf_prog = argv_join (argc - optind, argv + optind);
  2036. /* Will exit on failure */
  2037. if (capfile != NULL || dev != NULL)
  2038. setup_packet_capture (&pcap, &linktype, dev, capfile, bpf_prog,
  2039. target.dialect->v6_capable || always_v6,
  2040. use_promisc, pcap_override_buffer_size);
  2041. else if (rsock > 0)
  2042. linktype = 1; //LINKTYPE_ETHERNET
  2043. /* Netflow send socket */
  2044. for (dest_idx = 0; dest_idx < target.num_destinations; dest_idx++) {
  2045. dest = &target.destinations[dest_idx];
  2046. if (dest->ss.ss_family != 0) {
  2047. if ((err = getnameinfo ((struct sockaddr *) &dest->ss, dest->sslen,
  2048. dest->hostname, sizeof (dest->hostname),
  2049. dest->servname, sizeof (dest->servname),
  2050. NI_NUMERICHOST | NI_NUMERICSERV)) == -1) {
  2051. fprintf (stderr, "getnameinfo: %d\n", err);
  2052. exit (1);
  2053. }
  2054. #ifdef ENABLE_NTOPNG
  2055. if (target.dialect->version == SOFTFLOWD_NF_VERSION_NTOPNG) {
  2056. int rc = connect_ntopng (dest->hostname, dest->servname, &dest->zmq);
  2057. if (rc) {
  2058. fprintf (stderr,
  2059. "Could not create ZeroMQ socket for %s:%s: (%d) %s\n",
  2060. dest->hostname, dest->servname, rc, strerror (rc));
  2061. exit (1);
  2062. }
  2063. } else
  2064. #endif
  2065. dest->sock = connsock (&dest->ss, dest->sslen, hoplimit, protocol);
  2066. #ifdef LINUX
  2067. if (dest->sock > 0 && send_ifname != NULL) {
  2068. bind_device (dest->sock, send_ifname);
  2069. }
  2070. #endif /* LINUX */
  2071. }
  2072. }
  2073. /* Control socket */
  2074. if (ctlsock_path != NULL)
  2075. ctlsock = unix_listener (ctlsock_path); /* Will exit on fail */
  2076. if (dontfork_flag) {
  2077. loginit (PROGNAME, 1);
  2078. } else {
  2079. FILE *pidfile;
  2080. r = daemon (0, 0);
  2081. loginit (PROGNAME, 0);
  2082. if ((pidfile = fopen (pidfile_path, "r")) != NULL) {
  2083. int pid;
  2084. if (fscanf (pidfile, "%u", &pid) == EOF) {
  2085. //fscanf error
  2086. if (ferror (pidfile)) {
  2087. perror ("fscanf");
  2088. }
  2089. }
  2090. fclose (pidfile);
  2091. /* Check if the pid exists */
  2092. int pidfree = (kill (pid, 0) && errno == ESRCH);
  2093. if (!pidfree) {
  2094. fprintf (stderr, "Already running under pid %u\n", pid);
  2095. exit (1);
  2096. }
  2097. }
  2098. if ((pidfile = fopen (pidfile_path, "w")) == NULL) {
  2099. fprintf (stderr, "Couldn't open pidfile %s: %s\n",
  2100. pidfile_path, strerror (errno));
  2101. exit (1);
  2102. }
  2103. fprintf (pidfile, "%u\n", (unsigned int) getpid ());
  2104. fclose (pidfile);
  2105. signal (SIGINT, sighand_graceful_shutdown);
  2106. signal (SIGTERM, sighand_graceful_shutdown);
  2107. signal (SIGSEGV, sighand_other);
  2108. setprotoent (1);
  2109. drop_privs ();
  2110. }
  2111. logit (LOG_NOTICE, "%s v%s starting data collection", PROGNAME, PROGVER);
  2112. for (dest_idx = 0; dest_idx < target.num_destinations; dest_idx++) {
  2113. dest = &target.destinations[dest_idx];
  2114. if (dest->ss.ss_family != 0) {
  2115. logit (LOG_NOTICE, "Exporting flows from %s to [%s]:%s",
  2116. flowtrack.param.option.interfaceName,
  2117. dest->hostname, dest->servname);
  2118. }
  2119. }
  2120. flowtrack.param.option.meteringProcessId = getpid ();
  2121. /* Main processing loop */
  2122. gettimeofday (&flowtrack.param.system_boot_time, NULL);
  2123. stop_collection_flag = 0;
  2124. memset (&cb_ctxt, '\0', sizeof (cb_ctxt));
  2125. cb_ctxt.ft = &flowtrack;
  2126. cb_ctxt.target = &target;
  2127. cb_ctxt.linktype = linktype;
  2128. cb_ctxt.want_v6 = target.dialect->v6_capable || always_v6;
  2129. #ifdef ENABLE_PTHREAD
  2130. if (use_thread) {
  2131. if (pthread_create
  2132. (&read_thread, NULL, process_packet_loop, (void *) &cb_ctxt) < 0) {
  2133. perror ("pthread_create error");
  2134. exit (1);
  2135. }
  2136. }
  2137. #endif /* ENABLE_PTHREAD */
  2138. for (r = 0; graceful_shutdown_request == 0; r = 0) {
  2139. /*
  2140. * Silly libpcap's timeout function doesn't work, so we
  2141. * do it here (only if we are reading live)
  2142. */
  2143. if (capfile == NULL && (dev != NULL || rsock > 0)) { //online
  2144. memset (pl, '\0', sizeof (pl));
  2145. /* This can only be set via the control socket */
  2146. if (!stop_collection_flag && dev != NULL) {
  2147. pl[0].events = POLLIN | POLLERR | POLLHUP;
  2148. pl[0].fd = pcap_fileno (pcap);
  2149. } else if (!stop_collection_flag && rsock > 0) {
  2150. pl[0].fd = rsock;
  2151. pl[0].events = POLLIN | POLLERR | POLLHUP;
  2152. }
  2153. if (ctlsock != -1) {
  2154. pl[1].fd = ctlsock;
  2155. pl[1].events = POLLIN | POLLERR | POLLHUP;
  2156. }
  2157. r = poll (pl, (ctlsock == -1) ? 1 : 2, next_expire (&flowtrack));
  2158. if (r == -1 && errno != EINTR) {
  2159. logit (LOG_ERR, "Exiting on poll: %s", strerror (errno));
  2160. break;
  2161. }
  2162. }
  2163. /* Accept connection on control socket if present */
  2164. if (ctlsock != -1 && pl[1].revents != 0) {
  2165. if (accept_control (ctlsock, &target, &flowtrack, pcap,
  2166. &exit_request, &stop_collection_flag) != 0)
  2167. break;
  2168. }
  2169. /* If we have data, run it through libpcap */
  2170. if (!stop_collection_flag && (capfile != NULL || pl[0].revents != 0)) {
  2171. if (capfile != NULL || dev != NULL) {
  2172. #ifdef ENABLE_PTHREAD
  2173. if (use_thread)
  2174. r =
  2175. pcap_dispatch (pcap, flowtrack.param.max_flows, pcap_memcpy,
  2176. NULL);
  2177. else
  2178. #endif /* ENABLE_PTHREAD */
  2179. r = pcap_dispatch (pcap, flowtrack.param.max_flows, flow_cb,
  2180. (void *) &cb_ctxt);
  2181. if (r == -1) {
  2182. logit (LOG_ERR, "Exiting on pcap_dispatch: %s", pcap_geterr (pcap));
  2183. break;
  2184. } else if (r == 0 && capfile != NULL) {
  2185. logit (LOG_NOTICE, "Shutting down after " "pcap EOF");
  2186. graceful_shutdown_request = 1;
  2187. break;
  2188. }
  2189. } else if (rsock > 0) {
  2190. for (recvloop = 0;
  2191. recvloop < flowtrack.param.max_flows && pl[0].revents != 0;
  2192. recvloop++) {
  2193. r = recv_psamp (rsock, &cb_ctxt);
  2194. if (r == -1) {
  2195. logit (LOG_ERR, "recv_psamp error");
  2196. break;
  2197. }
  2198. if (recvloop + 1 == flowtrack.param.max_flows) {
  2199. r = poll (pl, 1, next_expire (&flowtrack));
  2200. if (r == -1 && errno != EINTR) {
  2201. logit (LOG_ERR, "Exiting on poll: %s", strerror (errno));
  2202. break;
  2203. }
  2204. }
  2205. }
  2206. }
  2207. }
  2208. r = 0;
  2209. /* Fatal error from per-packet functions */
  2210. if (cb_ctxt.fatal) {
  2211. logit (LOG_WARNING, "Fatal error - exiting immediately");
  2212. break;
  2213. }
  2214. /*
  2215. * Expiry processing happens every recheck_rate seconds
  2216. * or whenever we have exceeded the maximum number of active
  2217. * flows
  2218. */
  2219. if (flowtrack.param.num_flows > flowtrack.param.max_flows ||
  2220. next_expire (&flowtrack) == 0) {
  2221. expiry_check:
  2222. /*
  2223. * If we are reading from a capture file, we never
  2224. * expire flows based on time - instead we only
  2225. * expire flows when the flow table is full.
  2226. */
  2227. if (check_expired (&flowtrack, &target,
  2228. capfile == NULL ? CE_EXPIRE_NORMAL :
  2229. CE_EXPIRE_FORCED) < 0)
  2230. logit (LOG_WARNING, "Unable to export flows");
  2231. /*
  2232. * If we are over max_flows, force-expire the oldest
  2233. * out first and immediately reprocess to evict them
  2234. */
  2235. if (flowtrack.param.num_flows > flowtrack.param.max_flows) {
  2236. force_expire (&flowtrack,
  2237. flowtrack.param.num_flows - flowtrack.param.max_flows);
  2238. goto expiry_check;
  2239. }
  2240. }
  2241. }
  2242. /* Flags set by signal handlers or control socket */
  2243. if (graceful_shutdown_request) {
  2244. logit (LOG_WARNING, "Shutting down on user request");
  2245. check_expired (&flowtrack, &target, CE_EXPIRE_ALL);
  2246. } else if (exit_request)
  2247. logit (LOG_WARNING, "Exiting immediately on user request");
  2248. else
  2249. logit (LOG_ERR, "Exiting immediately on internal error");
  2250. if (capfile != NULL && dontfork_flag)
  2251. statistics (&flowtrack, stdout, pcap);
  2252. #ifdef ENABLE_PTHREAD
  2253. if (use_thread) {
  2254. pthread_cond_signal (&read_cond);
  2255. pthread_join (read_thread, NULL);
  2256. }
  2257. #endif /* ENABLE_PTHREAD */
  2258. pcap_close (pcap);
  2259. for (dest_idx = 0; dest_idx < target.num_destinations; dest_idx++) {
  2260. dest = &target.destinations[dest_idx];
  2261. if (dest->sock != -1)
  2262. close (dest->sock);
  2263. }
  2264. unlink (pidfile_path);
  2265. if (ctlsock_path != NULL)
  2266. unlink (ctlsock_path);
  2267. if (rsock > 0)
  2268. close (rsock);
  2269. return (r == 0 ? 0 : 1);
  2270. }