nmsg 1.3.0
private.h
1/*
2 * Copyright (c) 2023-2024 DomainTools LLC
3 * Copyright (c) 2008-2015, 2019, 2021 by Farsight Security, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef NMSG_PRIVATE_H
19#define NMSG_PRIVATE_H
20
21#include "nmsg_port_net.h"
22
23#ifdef HAVE_ENDIAN_H
24# include <endian.h>
25#else
26# ifdef HAVE_SYS_ENDIAN_H
27# include <sys/endian.h>
28# endif
29#endif
30
31#include <sys/types.h>
32#include <sys/socket.h>
33#include <sys/time.h>
34#include <sys/stat.h>
35#include <assert.h>
36#include <ctype.h>
37#include <errno.h>
38#include <fcntl.h>
39#include <inttypes.h>
40#include <limits.h>
41#include <pthread.h>
42#include <poll.h>
43#include <signal.h>
44#include <stdarg.h>
45#include <stdatomic.h>
46#include <stdbool.h>
47#include <stddef.h>
48#include <stdio.h>
49#include <stdint.h>
50#include <stdlib.h>
51#include <string.h>
52#include <strings.h>
53#include <time.h>
54#include <unistd.h>
55#include <arpa/inet.h>
56
57#include <zlib.h>
58
59#include <protobuf-c/protobuf-c.h>
60
61#ifdef HAVE_LIBZMQ
62# include <zmq.h>
63#endif /* HAVE_LIBZMQ */
64
65#ifdef HAVE_LIBRDKAFKA
66#include <librdkafka/rdkafka.h>
67#endif /* HAVE_LIBRDKAFKA */
68
69#ifdef HAVE_JSON_C
70#include <json.h>
71#endif /* HAVE_JSON_C */
72
73#include "nmsg.h"
74#include "nmsg.pb-c.h"
75
76#include "fltmod_plugin.h"
77#include "statsmod_plugin.h"
78#include "msgmod_plugin.h"
79#include "ipreasm.h"
80#include "nmsg_json.h"
81
82#include "libmy/crc32c.h"
83#include "libmy/list.h"
84#include "libmy/tree.h"
85#include "libmy/ubuf.h"
86#include "libmy/b64_decode.h"
87#include "libmy/b64_encode.h"
88#include "libmy/vector.h"
89#include "libmy/fast_inet_ntop.h"
90
91#include "config_file.h"
92
93#ifdef HAVE_LIBRDKAFKA
94#include "kafkaio.h"
95#endif /* HAVE_LIBRDKAFKA */
96
97/* Macros. */
98
99#define STR(x) #x
100#define XSTR(x) STR(x)
101
102#define NMSG_SEQSRC_GC_INTERVAL 120
103#define NMSG_FRAG_GC_INTERVAL 30
104#define NMSG_NSEC_PER_SEC 1000000000
105
106#define DEFAULT_STRBUF_ALLOC_SZ 16384
107
108#define NMSG_FLT_MODULE_PREFIX "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION)
109#define NMSG_STATS_MODULE_PREFIX "nmsg_stats" XSTR(NMSG_STATSMOD_VERSION)
110#define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
111
112#define NMSG_MODULE_SUFFIX ".so"
113
114#define _nmsg_dprintf(level, format, ...) \
115do { \
116 if (_nmsg_global_debug >= (level)) \
117 fprintf(stderr, format, ##__VA_ARGS__); \
118} while (0)
119
120#define _nmsg_dprintfv(var, level, format, ...) \
121do { \
122 if ((var) >= (level)) \
123 fprintf(stderr, format, ##__VA_ARGS__); \
124} while (0)
125
126/* Enums. */
127
128typedef enum {
129 nmsg_stream_type_file,
130 nmsg_stream_type_sock,
131 nmsg_stream_type_zmq,
132 nmsg_stream_type_kafka,
133 nmsg_stream_type_null,
134} nmsg_stream_type;
135
136/* Forward. */
137
138struct nmsg_brate;
139struct nmsg_buf;
140struct nmsg_container;
141struct nmsg_dlmod;
142struct nmsg_frag;
143struct nmsg_frag_key;
144struct nmsg_frag_tree;
145struct nmsg_input;
146struct nmsg_json;
147struct nmsg_output;
148struct nmsg_msgmod;
149struct nmsg_msgmod_field;
150struct nmsg_msgmod_clos;
151struct nmsg_pcap;
152struct nmsg_pres;
153struct nmsg_stream_input;
154struct nmsg_stream_output;
155struct nmsg_seqsrc;
156struct nmsg_seqsrc_key;
157
158/* Globals. */
159
160extern bool _nmsg_global_autoclose;
161extern int _nmsg_global_debug;
162extern struct nmsg_msgmodset * _nmsg_global_msgmodset;
163
164/* Function types. */
165
166typedef nmsg_res (*nmsg_input_read_fp)(struct nmsg_input *, nmsg_message_t *);
167typedef nmsg_res (*nmsg_input_read_loop_fp)(struct nmsg_input *, int,
168 nmsg_cb_message, void *);
169typedef nmsg_res (*nmsg_input_stream_read_fp)(struct nmsg_input *, Nmsg__Nmsg **);
170typedef nmsg_res (*nmsg_output_write_fp)(struct nmsg_output *, nmsg_message_t);
171typedef nmsg_res (*nmsg_output_flush_fp)(struct nmsg_output *);
172
173/* Data types. */
174
175/* nmsg_seqsrc */
177 uint64_t sequence_id;
178 sa_family_t af;
179 uint16_t port;
180 union {
181 uint8_t ip4[4];
182 uint8_t ip6[16];
183 };
184};
185
187 ISC_LINK(struct nmsg_seqsrc) link;
188 struct nmsg_seqsrc_key key;
189 uint32_t sequence;
190 uint64_t sequence_id;
191 uint64_t count;
192 uint64_t count_dropped;
193 time_t last;
194 bool init;
195 char addr_str[INET6_ADDRSTRLEN];
196};
197
198/* nmsg_frag: used by nmsg_stream_input */
200 uint32_t id;
201 uint32_t crc;
202 struct sockaddr_storage addr_ss;
203};
204
205struct nmsg_frag {
206 RB_ENTRY(nmsg_frag) link;
207 struct nmsg_frag_key key;
208 unsigned last;
209 unsigned rem;
210 struct timespec ts;
211 ProtobufCBinaryData *frags;
212};
213
214/* nmsg_frag_tree: used by nmsg_stream_input */
216 RB_HEAD(frag_ent, nmsg_frag) head;
217};
218
219/* nmsg_buf: used by nmsg_stream_input, nmsg_stream_output */
220struct nmsg_buf {
221 int fd;
222 size_t bufsz;
223 u_char *data; /* allocated data starts here */
224 u_char *pos; /* position of next buffer read */
225 u_char *end; /* one byte beyond valid data */
226};
227
228/* nmsg_pcap: used by nmsg_input */
229struct nmsg_pcap {
230 int datalink;
231 pcap_t *handle;
232 struct _nmsg_ipreasm *reasm;
233 u_char *new_pkt;
234
235 pcap_t *user;
236 char *userbpft;
237 struct bpf_program userbpf;
238
239 nmsg_pcap_type type;
240 bool raw;
241};
242
243/* nmsg_pres: used by nmsg_input and nmsg_output */
244struct nmsg_pres {
245 pthread_mutex_t lock;
246 FILE *fp;
247 bool flush;
248 char *endline;
249 unsigned source;
250 unsigned operator;
251 unsigned group;
252};
253
254/* nmsg_json: used by nmsg_input and nmsg_output */
255struct nmsg_json {
256#ifdef HAVE_JSON_C
257#endif /* HAVE_JSON_C */
258 pthread_mutex_t lock;
259 FILE *fp;
260 int orig_fd;
261 bool flush;
262 unsigned source;
263 unsigned operator;
264 unsigned group;
265};
266
267/* nmsg_kafka_json: used by nmsg_input and nmsg_output */
269#ifdef HAVE_LIBRDKAFKA
270 kafka_ctx_t ctx;
271 const char *key_field;
272#endif /* HAVE_LIBRDKAFKA */
273 bool flush;
274 unsigned source;
275 unsigned operator;
276 unsigned group;
277};
278
279/* nmsg_stream_input: used by nmsg_input */
281 nmsg_stream_type type;
282 struct nmsg_buf *buf;
283#ifdef HAVE_LIBZMQ
284 void *zmq;
285#endif /* HAVE_LIBZMQ */
286#ifdef HAVE_LIBRDKAFKA
287 kafka_ctx_t kafka;
288#endif /* HAVE_LIBRDKAFKA */
289 Nmsg__Nmsg *nmsg;
290 unsigned np_index;
291 size_t nc_size;
292 struct nmsg_frag_tree nft;
293 struct pollfd pfd;
294 struct timespec now;
295 struct timespec lastgc;
296 unsigned nfrags;
297 unsigned flags;
298 nmsg_zbuf_t zb;
299 u_char *zb_tmp;
300 unsigned source;
301 unsigned operator;
302 unsigned group;
303 bool blocking_io;
304 bool verify_seqsrc;
305 struct nmsg_brate *brate;
306 ISC_LIST(struct nmsg_seqsrc) seqsrcs;
307 struct sockaddr_storage addr_ss;
308 uint64_t count_recv;
309 uint64_t count_drop;
310
311 nmsg_input_stream_read_fp stream_read_fp;
312};
313
314/* nmsg_stream_output: used by nmsg_output */
316 pthread_mutex_t c_lock; /* Container lock. */
317 pthread_mutex_t w_lock; /* Write/Send lock. */
318 nmsg_stream_type type;
319 int fd;
320#ifdef HAVE_LIBZMQ
321 void *zmq;
322#endif /* HAVE_LIBZMQ */
323#ifdef HAVE_LIBRDKAFKA
324 kafka_ctx_t kafka;
325#endif /* HAVE_LIBRDKAFKA */
326 nmsg_container_t c;
327 size_t bufsz;
328 nmsg_random_t random;
329 nmsg_rate_t rate;
330 bool buffered;
331 unsigned source;
332 unsigned operator;
333 unsigned group;
334 bool do_zlib;
335 bool do_sequence;
336 atomic_uint_fast32_t so_sequence_num;
337 uint64_t sequence_id;
338};
339
340/* nmsg_callback_output: used by nmsg_output */
343 void *user;
344};
345
346/* nmsg_callback_input: used by nmsg_input */
349 void *user;
350};
351
352/* nmsg_input */
354 nmsg_input_type type;
355 nmsg_msgmod_t msgmod;
356 void *clos;
357 union {
358 struct nmsg_stream_input *stream;
359 struct nmsg_pcap *pcap;
360 struct nmsg_pres *pres;
361 struct nmsg_json *json;
362 struct nmsg_kafka_json *kafka;
363 struct nmsg_callback_input *callback;
364 };
365 nmsg_input_read_fp read_fp;
366 nmsg_input_read_loop_fp read_loop_fp;
367
368 bool do_filter;
369 unsigned filter_vid;
370 unsigned filter_msgtype;
371 volatile bool stop;
372};
373
374/* nmsg_output */
376 nmsg_output_type type;
377 union {
378 struct nmsg_stream_output *stream;
379 struct nmsg_pres *pres;
380 struct nmsg_json *json;
381 struct nmsg_kafka_json *kafka;
382 struct nmsg_callback_output *callback;
383 };
384 nmsg_output_write_fp write_fp;
385 nmsg_output_flush_fp flush_fp;
386
387 bool do_filter;
388 unsigned filter_vid;
389 unsigned filter_msgtype;
390 volatile bool stop;
391};
392
393/* nmsg_message */
395 nmsg_msgmod_t mod;
396 ProtobufCMessage *message;
398 void *msg_clos;
399 size_t n_allocs;
400 void **allocs;
401 bool updated;
402};
403
431/* dlmod / msgmod / msgmodset */
432
434 ISC_LINK(struct nmsg_dlmod) link;
435 char *path;
436 void *handle;
437};
438
439typedef enum nmsg_msgmod_clos_mode {
440 nmsg_msgmod_clos_m_keyval,
441 nmsg_msgmod_clos_m_multiline
442} nmsg_msgmod_clos_mode;
443
445 char *nmsg_pbuf;
446 size_t estsz;
447 nmsg_msgmod_clos_mode mode;
448 struct nmsg_msgmod_field *field;
449 struct nmsg_strbuf *strbufs;
450 void *mod_clos;
451};
452
454 struct nmsg_msgmod **msgtypes;
455 char *vname;
456 size_t nm;
457};
458
460 struct nmsg_msgmod_plugin *plugin;
461 struct nmsg_msgmod_field *fields;
462 struct nmsg_msgmod_field **fields_idx;
463 size_t n_fields;
464};
465
467 ISC_LIST(struct nmsg_dlmod) dlmods;
468 struct nmsg_msgvendor **vendors;
469 size_t nv;
470};
471
472/* internal nmsg_strbuf wrapper to use expensive stack allocation by default */
474 struct nmsg_strbuf sb;
475 char fixed[DEFAULT_STRBUF_ALLOC_SZ];
476};
477
478/* Prototypes. */
479
480/* from alias.c */
481
482nmsg_res _nmsg_alias_init(void);
483void _nmsg_alias_fini(void);
484
485/* from buf.c */
486
487ssize_t _nmsg_buf_avail(struct nmsg_buf *buf);
488ssize_t _nmsg_buf_used(struct nmsg_buf *buf);
489struct nmsg_buf * _nmsg_buf_new(size_t sz);
490void _nmsg_buf_destroy(struct nmsg_buf **buf);
491void _nmsg_buf_reset(struct nmsg_buf *buf);
492
493/* from dlmod.c */
494
495struct nmsg_dlmod * _nmsg_dlmod_init(const char *path);
496void _nmsg_dlmod_destroy(struct nmsg_dlmod **dlmod);
497
498/* from msgmod.c */
499
500struct nmsg_msgmod * _nmsg_msgmod_start(struct nmsg_msgmod_plugin *plugin);
501void _nmsg_msgmod_stop(struct nmsg_msgmod **mod);
502
503/* from message.c */
504
505nmsg_res _nmsg_message_init_message(struct nmsg_message *msg);
506nmsg_res _nmsg_message_init_payload(struct nmsg_message *msg);
507nmsg_res _nmsg_message_deserialize(struct nmsg_message *msg);
508nmsg_res _nmsg_message_serialize(struct nmsg_message *msg);
509nmsg_message_t _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
510nmsg_message_t _nmsg_message_dup(struct nmsg_message *msg);
511nmsg_res _nmsg_message_dup_protobuf(const struct nmsg_message *msg, ProtobufCMessage **dst);
512nmsg_res _nmsg_message_to_json(nmsg_output_t output, nmsg_message_t msg, struct nmsg_strbuf *sb);
513#ifdef HAVE_LIBRDKAFKA
514nmsg_res _nmsg_message_get_field_value_as_key(nmsg_message_t msg, const char *name, struct nmsg_strbuf *sb);
515#endif /* HAVE_LIBRDKAFKA */
516
517/* from msgmodset.c */
518
519struct nmsg_msgmodset * _nmsg_msgmodset_init(const char *path);
520void _nmsg_msgmodset_destroy(struct nmsg_msgmodset **);
521
522/* from strbuf.c */
523struct nmsg_strbuf * _nmsg_strbuf_init(struct nmsg_strbuf_storage *sbs);
524void _nmsg_strbuf_destroy(struct nmsg_strbuf_storage *sbs);
525nmsg_res _nmsg_strbuf_expand(struct nmsg_strbuf *sb, size_t size);
526char * _nmsg_strbuf_detach(struct nmsg_strbuf *size);
527
528/* from payload.c */
529void _nmsg_payload_free_all(Nmsg__Nmsg *nc);
530void _nmsg_payload_free_crcs(Nmsg__Nmsg *nc);
531void _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
532void _nmsg_payload_free(Nmsg__NmsgPayload **np);
533size_t _nmsg_payload_size(const Nmsg__NmsgPayload *np);
534
535/* from input_frag.c */
536nmsg_res _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf, size_t buf_len);
537void _input_frag_destroy(struct nmsg_stream_input *);
538void _input_frag_gc(struct nmsg_stream_input *);
539
540/* from input_nmsg.c */
541bool _input_nmsg_filter(nmsg_input_t, unsigned, Nmsg__NmsgPayload *);
542nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
543nmsg_res _input_nmsg_loop(nmsg_input_t, int, nmsg_cb_message, void *);
544nmsg_res _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *, size_t);
545nmsg_res _input_nmsg_unpack_container2(const uint8_t *, size_t, unsigned, Nmsg__Nmsg **);
546nmsg_res _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
547nmsg_res _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
548#ifdef HAVE_LIBRDKAFKA
549nmsg_res _input_nmsg_read_container_kafka(nmsg_input_t, Nmsg__Nmsg **);
550#endif /* HAVE_LIBRDKAFKA */
551#ifdef HAVE_LIBZMQ
552nmsg_res _input_nmsg_read_container_zmq(nmsg_input_t, Nmsg__Nmsg **);
553#endif /* HAVE_LIBZMQ */
554nmsg_res _input_nmsg_deserialize_header(const uint8_t *, size_t, ssize_t *, unsigned *);
555
556/* from input_callback.c */
557nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
558
559/* from input_nullnmsg.c */
560nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
561nmsg_res _input_nmsg_loop_null(nmsg_input_t, int, nmsg_cb_message, void *);
562
563/* from input_pcap.c */
564nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
565nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
566
567/* from input_pres.c */
568nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
569
570/* from input_json.c */
571nmsg_res _input_json_read(nmsg_input_t, nmsg_message_t *);
572#ifdef HAVE_LIBRDKAFKA
573nmsg_res _input_kafka_json_read(nmsg_input_t, nmsg_message_t *);
574#endif /* HAVE_LIBRDKAFKA */
575
576/* from input_seqsrc.c */
577struct nmsg_seqsrc * _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
578void _input_seqsrc_destroy(nmsg_input_t);
579size_t _input_seqsrc_update(nmsg_input_t, struct nmsg_seqsrc *, Nmsg__Nmsg *);
580
581/* from input.c */
582nmsg_input_t _input_open_kafka(void *s);
583
584/* from output.c */
585void _output_stop(nmsg_output_t);
586nmsg_output_t _output_open_kafka(void *s, size_t bufsz);
587
588/* from output_nmsg.c */
589nmsg_res _output_nmsg_flush(nmsg_output_t);
590nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
591
592/* from output_pres.c */
593nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
594
595/* from output_json.c */
596nmsg_res _output_json_write(nmsg_output_t, nmsg_message_t);
597#ifdef HAVE_LIBRDKAFKA
598nmsg_res _output_kafka_json_write(nmsg_output_t output, nmsg_message_t msg);
599nmsg_res _output_kafka_json_flush(nmsg_output_t);
600#endif /* HAVE_LIBRDKAFKA */
601
602/* from brate.c */
603struct nmsg_brate * _nmsg_brate_init(size_t target_byte_rate);
604void _nmsg_brate_destroy(struct nmsg_brate **);
605void _nmsg_brate_sleep(struct nmsg_brate *, size_t container_sz, size_t n_payloads, size_t n);
606
607/* from ipdg.c */
608
654_nmsg_ipdg_parse_reasm(struct nmsg_ipdg *dg, unsigned etype, size_t len,
655 const u_char *pkt, struct _nmsg_ipreasm *reasm,
656 unsigned *new_len, u_char *new_pkt, int *defrag,
657 uint64_t timestamp);
658
659#endif /* NMSG_PRIVATE_H */
Implementing message filter modules.
nmsg_input_type
An enum identifying the underlying implementation of an nmsg_input_t object.
Definition: input.h:55
Base nmsg support header.
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.
Definition: nmsg.h:96
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
Definition: nmsg.h:80
nmsg_output_type
An enum identifying the underlying implementation of an nmsg_output_t object.
Definition: output.h:39
nmsg_res
nmsg result code
Definition: res.h:25
Implementing statistics export modules.
an nmsg_message MUST always have a non-NULL ->np member.
Definition: private.h:433
Parsed IP datagram.
Definition: ipdg.h:31
Structure mapping protocol buffer schema fields to nmsg_msgmod_field_type values for "transparent" mo...
Structure exported by message modules to implement a new message type.
String buffer.
Definition: strbuf.h:28