nmsg 1.1.1
private.h
1/*
2 * Copyright (c) 2023 DomainTools LLC
3 * Copyright (c) 2008-2015, 2019, 2021 by Farsight Security, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef NMSG_PRIVATE_H
19#define NMSG_PRIVATE_H
20
21#include "nmsg_port_net.h"
22
23#ifdef HAVE_ENDIAN_H
24# include <endian.h>
25#else
26# ifdef HAVE_SYS_ENDIAN_H
27# include <sys/endian.h>
28# endif
29#endif
30
31#include <sys/types.h>
32#include <sys/socket.h>
33#include <sys/time.h>
34#include <sys/stat.h>
35#include <assert.h>
36#include <ctype.h>
37#include <errno.h>
38#include <fcntl.h>
39#include <inttypes.h>
40#include <limits.h>
41#include <pthread.h>
42#include <poll.h>
43#include <signal.h>
44#include <stdarg.h>
45#include <stdbool.h>
46#include <stddef.h>
47#include <stdio.h>
48#include <stdint.h>
49#include <stdlib.h>
50#include <string.h>
51#include <strings.h>
52#include <time.h>
53#include <unistd.h>
54#include <arpa/inet.h>
55
56#include <zlib.h>
57
58#include <protobuf-c/protobuf-c.h>
59
60#ifdef HAVE_LIBZMQ
61# include <zmq.h>
62#endif /* HAVE_LIBZMQ */
63
64#ifdef HAVE_JSON_C
65#include <json.h>
66#endif /* HAVE_JSON_C */
67
68#include "nmsg.h"
69#include "nmsg.pb-c.h"
70
71#include "fltmod_plugin.h"
72#include "msgmod_plugin.h"
73#include "ipreasm.h"
74#include "nmsg_json.h"
75
76#include "libmy/crc32c.h"
77#include "libmy/list.h"
78#include "libmy/tree.h"
79#include "libmy/ubuf.h"
80#include "libmy/b64_decode.h"
81#include "libmy/b64_encode.h"
82#include "libmy/vector.h"
83#include "libmy/fast_inet_ntop.h"
84
85/* Macros. */
86
87#define STR(x) #x
88#define XSTR(x) STR(x)
89
90#define NMSG_SEQSRC_GC_INTERVAL 120
91#define NMSG_FRAG_GC_INTERVAL 30
92#define NMSG_NSEC_PER_SEC 1000000000
93
94#define DEFAULT_STRBUF_ALLOC_SZ 16384
95
96#define NMSG_FLT_MODULE_PREFIX "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION)
97#define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
98
99#define NMSG_MODULE_SUFFIX ".so"
100
101#define _nmsg_dprintf(level, format, ...) \
102do { \
103 if (_nmsg_global_debug >= (level)) \
104 fprintf(stderr, format, ##__VA_ARGS__); \
105} while (0)
106
107#define _nmsg_dprintfv(var, level, format, ...) \
108do { \
109 if ((var) >= (level)) \
110 fprintf(stderr, format, ##__VA_ARGS__); \
111} while (0)
112
113/* Enums. */
114
115typedef enum {
116 nmsg_stream_type_file,
117 nmsg_stream_type_sock,
118 nmsg_stream_type_zmq,
119 nmsg_stream_type_null,
120} nmsg_stream_type;
121
122/* Forward. */
123
124struct nmsg_brate;
125struct nmsg_buf;
126struct nmsg_container;
127struct nmsg_dlmod;
128struct nmsg_frag;
129struct nmsg_frag_key;
130struct nmsg_frag_tree;
131struct nmsg_input;
132struct nmsg_json;
133struct nmsg_output;
134struct nmsg_msgmod;
135struct nmsg_msgmod_field;
136struct nmsg_msgmod_clos;
137struct nmsg_pcap;
138struct nmsg_pres;
139struct nmsg_stream_input;
140struct nmsg_stream_output;
141struct nmsg_seqsrc;
142struct nmsg_seqsrc_key;
143
144/* Globals. */
145
146extern bool _nmsg_global_autoclose;
147extern int _nmsg_global_debug;
148extern struct nmsg_msgmodset * _nmsg_global_msgmodset;
149
150/* Function types. */
151
152typedef nmsg_res (*nmsg_input_read_fp)(struct nmsg_input *, nmsg_message_t *);
153typedef nmsg_res (*nmsg_input_read_loop_fp)(struct nmsg_input *, int,
154 nmsg_cb_message, void *);
155typedef nmsg_res (*nmsg_input_stream_read_fp)(struct nmsg_input *, Nmsg__Nmsg **);
156typedef nmsg_res (*nmsg_output_write_fp)(struct nmsg_output *, nmsg_message_t);
157typedef nmsg_res (*nmsg_output_flush_fp)(struct nmsg_output *);
158
159/* Data types. */
160
161/* nmsg_seqsrc */
162struct nmsg_seqsrc_key {
163 uint64_t sequence_id;
164 sa_family_t af;
165 uint16_t port;
166 union {
167 uint8_t ip4[4];
168 uint8_t ip6[16];
169 };
170};
171
172struct nmsg_seqsrc {
173 ISC_LINK(struct nmsg_seqsrc) link;
174 struct nmsg_seqsrc_key key;
175 uint32_t sequence;
176 uint64_t sequence_id;
177 uint64_t count;
178 uint64_t count_dropped;
179 time_t last;
180 bool init;
181 char addr_str[INET6_ADDRSTRLEN];
182};
183
184/* nmsg_frag: used by nmsg_stream_input */
185struct nmsg_frag_key {
186 uint32_t id;
187 uint32_t crc;
188 struct sockaddr_storage addr_ss;
189};
190
191struct nmsg_frag {
192 RB_ENTRY(nmsg_frag) link;
193 struct nmsg_frag_key key;
194 unsigned last;
195 unsigned rem;
196 struct timespec ts;
197 ProtobufCBinaryData *frags;
198};
199
200/* nmsg_frag_tree: used by nmsg_stream_input */
201struct nmsg_frag_tree {
202 RB_HEAD(frag_ent, nmsg_frag) head;
203};
204
205/* nmsg_buf: used by nmsg_stream_input, nmsg_stream_output */
206struct nmsg_buf {
207 int fd;
208 size_t bufsz;
209 u_char *data; /* allocated data starts here */
210 u_char *pos; /* position of next buffer read */
211 u_char *end; /* one byte beyond valid data */
212};
213
214/* nmsg_pcap: used by nmsg_input */
215struct nmsg_pcap {
216 int datalink;
217 pcap_t *handle;
218 struct _nmsg_ipreasm *reasm;
219 u_char *new_pkt;
220
221 pcap_t *user;
222 char *userbpft;
223 struct bpf_program userbpf;
224
225 nmsg_pcap_type type;
226 bool raw;
227};
228
229/* nmsg_pres: used by nmsg_input and nmsg_output */
230struct nmsg_pres {
231 pthread_mutex_t lock;
232 FILE *fp;
233 bool flush;
234 char *endline;
235};
236
237/* nmsg_json: used by nmsg_input and nmsg_output */
238struct nmsg_json {
239#ifdef HAVE_JSON_C
240#endif /* HAVE_JSON_C */
241 pthread_mutex_t lock;
242 FILE *fp;
243 int orig_fd;
244 bool flush;
245};
246
247/* nmsg_stream_input: used by nmsg_input */
248struct nmsg_stream_input {
249 nmsg_stream_type type;
250 struct nmsg_buf *buf;
251#ifdef HAVE_LIBZMQ
252 void *zmq;
253#endif /* HAVE_LIBZMQ */
254 Nmsg__Nmsg *nmsg;
255 unsigned np_index;
256 size_t nc_size;
257 struct nmsg_frag_tree nft;
258 struct pollfd pfd;
259 struct timespec now;
260 struct timespec lastgc;
261 unsigned nfrags;
262 unsigned flags;
263 nmsg_zbuf_t zb;
264 u_char *zb_tmp;
265 unsigned source;
266 unsigned operator;
267 unsigned group;
268 bool blocking_io;
269 bool verify_seqsrc;
270 struct nmsg_brate *brate;
271 ISC_LIST(struct nmsg_seqsrc) seqsrcs;
272 struct sockaddr_storage addr_ss;
273 uint64_t count_recv;
274 uint64_t count_drop;
275
276 nmsg_input_stream_read_fp stream_read_fp;
277};
278
279/* nmsg_stream_output: used by nmsg_output */
280struct nmsg_stream_output {
281 pthread_mutex_t c_lock; /* Container lock. */
282 pthread_mutex_t w_lock; /* Write/Send lock. */
283 nmsg_stream_type type;
284 int fd;
285#ifdef HAVE_LIBZMQ
286 void *zmq;
287#endif /* HAVE_LIBZMQ */
288 nmsg_container_t c;
289 size_t bufsz;
290 nmsg_random_t random;
291 nmsg_rate_t rate;
292 bool buffered;
293 unsigned source;
294 unsigned operator;
295 unsigned group;
296 bool do_zlib;
297 bool do_sequence;
298 uint32_t sequence;
299 uint64_t sequence_id;
300};
301
302/* nmsg_callback_output: used by nmsg_output */
303struct nmsg_callback_output {
304 nmsg_cb_message cb;
305 void *user;
306};
307
308/* nmsg_callback_input: used by nmsg_input */
309struct nmsg_callback_input {
310 nmsg_cb_message_read cb;
311 void *user;
312};
313
314/* nmsg_input */
315struct nmsg_input {
316 nmsg_input_type type;
317 nmsg_msgmod_t msgmod;
318 void *clos;
319 union {
320 struct nmsg_stream_input *stream;
321 struct nmsg_pcap *pcap;
322 struct nmsg_pres *pres;
323 struct nmsg_json *json;
324 struct nmsg_callback_input *callback;
325 };
326 nmsg_input_read_fp read_fp;
327 nmsg_input_read_loop_fp read_loop_fp;
328
329 bool do_filter;
330 unsigned filter_vid;
331 unsigned filter_msgtype;
332 volatile bool stop;
333};
334
335/* nmsg_output */
336struct nmsg_output {
337 nmsg_output_type type;
338 union {
339 struct nmsg_stream_output *stream;
340 struct nmsg_pres *pres;
341 struct nmsg_json *json;
342 struct nmsg_callback_output *callback;
343 };
344 nmsg_output_write_fp write_fp;
345 nmsg_output_flush_fp flush_fp;
346
347 bool do_filter;
348 unsigned filter_vid;
349 unsigned filter_msgtype;
350 volatile bool stop;
351};
352
353/* nmsg_message */
354struct nmsg_message {
355 nmsg_msgmod_t mod;
356 ProtobufCMessage *message;
357 Nmsg__NmsgPayload *np;
358 void *msg_clos;
359 size_t n_allocs;
360 void **allocs;
361 bool updated;
362};
363
391/* dlmod / msgmod / msgmodset */
392
393struct nmsg_dlmod {
394 ISC_LINK(struct nmsg_dlmod) link;
395 char *path;
396 void *handle;
397};
398
399typedef enum nmsg_msgmod_clos_mode {
400 nmsg_msgmod_clos_m_keyval,
401 nmsg_msgmod_clos_m_multiline
402} nmsg_msgmod_clos_mode;
403
404struct nmsg_msgmod_clos {
405 char *nmsg_pbuf;
406 size_t estsz;
407 nmsg_msgmod_clos_mode mode;
408 struct nmsg_msgmod_field *field;
409 struct nmsg_strbuf *strbufs;
410 void *mod_clos;
411};
412
413struct nmsg_msgvendor {
414 struct nmsg_msgmod **msgtypes;
415 char *vname;
416 size_t nm;
417};
418
419struct nmsg_msgmod {
420 struct nmsg_msgmod_plugin *plugin;
421 struct nmsg_msgmod_field *fields;
422 struct nmsg_msgmod_field **fields_idx;
423 size_t n_fields;
424};
425
426struct nmsg_msgmodset {
427 ISC_LIST(struct nmsg_dlmod) dlmods;
428 struct nmsg_msgvendor **vendors;
429 size_t nv;
430};
431
432/* internal nmsg_strbuf wrapper to use expensive stack allocation by default */
433struct nmsg_strbuf_storage {
434 struct nmsg_strbuf sb;
435 char fixed[DEFAULT_STRBUF_ALLOC_SZ];
436};
437
438/* Prototypes. */
439
440/* from alias.c */
441
442nmsg_res _nmsg_alias_init(void);
443void _nmsg_alias_fini(void);
444
445/* from buf.c */
446
447ssize_t _nmsg_buf_avail(struct nmsg_buf *buf);
448ssize_t _nmsg_buf_used(struct nmsg_buf *buf);
449struct nmsg_buf * _nmsg_buf_new(size_t sz);
450void _nmsg_buf_destroy(struct nmsg_buf **buf);
451void _nmsg_buf_reset(struct nmsg_buf *buf);
452
453/* from dlmod.c */
454
455struct nmsg_dlmod * _nmsg_dlmod_init(const char *path);
456void _nmsg_dlmod_destroy(struct nmsg_dlmod **dlmod);
457
458/* from msgmod.c */
459
460struct nmsg_msgmod * _nmsg_msgmod_start(struct nmsg_msgmod_plugin *plugin);
461void _nmsg_msgmod_stop(struct nmsg_msgmod **mod);
462
463/* from message.c */
464
465nmsg_res _nmsg_message_init_message(struct nmsg_message *msg);
466nmsg_res _nmsg_message_init_payload(struct nmsg_message *msg);
467nmsg_res _nmsg_message_deserialize(struct nmsg_message *msg);
468nmsg_res _nmsg_message_serialize(struct nmsg_message *msg);
469nmsg_message_t _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
470nmsg_message_t _nmsg_message_dup(struct nmsg_message *msg);
471nmsg_res _nmsg_message_dup_protobuf(const struct nmsg_message *msg, ProtobufCMessage **dst);
472nmsg_res _nmsg_message_to_json(nmsg_message_t msg, struct nmsg_strbuf *sb);
473
474/* from msgmodset.c */
475
476struct nmsg_msgmodset * _nmsg_msgmodset_init(const char *path);
477void _nmsg_msgmodset_destroy(struct nmsg_msgmodset **);
478
479/* from strbuf.c */
480struct nmsg_strbuf * _nmsg_strbuf_init(struct nmsg_strbuf_storage *sbs);
481void _nmsg_strbuf_destroy(struct nmsg_strbuf_storage *sbs);
482nmsg_res _nmsg_strbuf_expand(struct nmsg_strbuf *sb, size_t size);
483char * _nmsg_strbuf_detach(struct nmsg_strbuf *size);
484
485/* from payload.c */
486void _nmsg_payload_free_all(Nmsg__Nmsg *nc);
487void _nmsg_payload_free_crcs(Nmsg__Nmsg *nc);
488void _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
489void _nmsg_payload_free(Nmsg__NmsgPayload **np);
490size_t _nmsg_payload_size(const Nmsg__NmsgPayload *np);
491
492/* from input_frag.c */
493nmsg_res _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf, size_t buf_len);
494void _input_frag_destroy(struct nmsg_stream_input *);
495void _input_frag_gc(struct nmsg_stream_input *);
496
497/* from input_nmsg.c */
498bool _input_nmsg_filter(nmsg_input_t, unsigned, Nmsg__NmsgPayload *);
499nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
500nmsg_res _input_nmsg_loop(nmsg_input_t, int, nmsg_cb_message, void *);
501nmsg_res _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *, size_t);
502nmsg_res _input_nmsg_unpack_container2(const uint8_t *, size_t, unsigned, Nmsg__Nmsg **);
503nmsg_res _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
504nmsg_res _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
505#ifdef HAVE_LIBZMQ
506nmsg_res _input_nmsg_read_container_zmq(nmsg_input_t, Nmsg__Nmsg **);
507#endif /* HAVE_LIBZMQ */
508nmsg_res _input_nmsg_deserialize_header(const uint8_t *, size_t, ssize_t *, unsigned *);
509
510/* from input_callback.c */
511nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
512
513/* from input_nullnmsg.c */
514nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
515nmsg_res _input_nmsg_loop_null(nmsg_input_t, int, nmsg_cb_message, void *);
516
517/* from input_pcap.c */
518nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
519nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
520
521/* from input_pres.c */
522nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
523
524/* from input_json.c */
525nmsg_res _input_json_read(nmsg_input_t, nmsg_message_t *);
526
527/* from input_seqsrc.c */
528struct nmsg_seqsrc * _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
529void _input_seqsrc_destroy(nmsg_input_t);
530size_t _input_seqsrc_update(nmsg_input_t, struct nmsg_seqsrc *, Nmsg__Nmsg *);
531
532/* from output.c */
533void _output_stop(nmsg_output_t);
534
535/* from output_nmsg.c */
536nmsg_res _output_nmsg_flush(nmsg_output_t);
537nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
538
539/* from output_pres.c */
540nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
541
542/* from output_json.c */
543nmsg_res _output_json_write(nmsg_output_t, nmsg_message_t);
544
545/* from brate.c */
546struct nmsg_brate * _nmsg_brate_init(size_t target_byte_rate);
547void _nmsg_brate_destroy(struct nmsg_brate **);
548void _nmsg_brate_sleep(struct nmsg_brate *, size_t container_sz, size_t n_payloads, size_t n);
549
550/* from ipdg.c */
551
596nmsg_res
597_nmsg_ipdg_parse_reasm(struct nmsg_ipdg *dg, unsigned etype, size_t len,
598 const u_char *pkt, struct _nmsg_ipreasm *reasm,
599 unsigned *new_len, u_char *new_pkt, int *defrag,
600 uint64_t timestamp);
601
602#endif /* NMSG_PRIVATE_H */