21#include "nmsg_port_net.h"
26# ifdef HAVE_SYS_ENDIAN_H
27# include <sys/endian.h>
32#include <sys/socket.h>
58#include <protobuf-c/protobuf-c.h>
71#include "fltmod_plugin.h"
72#include "msgmod_plugin.h"
76#include "libmy/crc32c.h"
77#include "libmy/list.h"
78#include "libmy/tree.h"
79#include "libmy/ubuf.h"
80#include "libmy/b64_decode.h"
81#include "libmy/b64_encode.h"
82#include "libmy/vector.h"
83#include "libmy/fast_inet_ntop.h"
90#define NMSG_SEQSRC_GC_INTERVAL 120
91#define NMSG_FRAG_GC_INTERVAL 30
92#define NMSG_NSEC_PER_SEC 1000000000
94#define DEFAULT_STRBUF_ALLOC_SZ 16384
96#define NMSG_FLT_MODULE_PREFIX "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION)
97#define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
99#define NMSG_MODULE_SUFFIX ".so"
101#define _nmsg_dprintf(level, format, ...) \
103 if (_nmsg_global_debug >= (level)) \
104 fprintf(stderr, format, ##__VA_ARGS__); \
107#define _nmsg_dprintfv(var, level, format, ...) \
109 if ((var) >= (level)) \
110 fprintf(stderr, format, ##__VA_ARGS__); \
116 nmsg_stream_type_file,
117 nmsg_stream_type_sock,
118 nmsg_stream_type_zmq,
119 nmsg_stream_type_null,
126struct nmsg_container;
130struct nmsg_frag_tree;
135struct nmsg_msgmod_field;
136struct nmsg_msgmod_clos;
139struct nmsg_stream_input;
140struct nmsg_stream_output;
142struct nmsg_seqsrc_key;
146extern bool _nmsg_global_autoclose;
147extern int _nmsg_global_debug;
148extern struct nmsg_msgmodset * _nmsg_global_msgmodset;
152typedef nmsg_res (*nmsg_input_read_fp)(
struct nmsg_input *, nmsg_message_t *);
153typedef nmsg_res (*nmsg_input_read_loop_fp)(
struct nmsg_input *, int,
154 nmsg_cb_message,
void *);
155typedef nmsg_res (*nmsg_input_stream_read_fp)(
struct nmsg_input *, Nmsg__Nmsg **);
156typedef nmsg_res (*nmsg_output_write_fp)(
struct nmsg_output *, nmsg_message_t);
157typedef nmsg_res (*nmsg_output_flush_fp)(
struct nmsg_output *);
162struct nmsg_seqsrc_key {
163 uint64_t sequence_id;
173 ISC_LINK(
struct nmsg_seqsrc) link;
174 struct nmsg_seqsrc_key key;
176 uint64_t sequence_id;
178 uint64_t count_dropped;
181 char addr_str[INET6_ADDRSTRLEN];
185struct nmsg_frag_key {
188 struct sockaddr_storage addr_ss;
192 RB_ENTRY(nmsg_frag) link;
193 struct nmsg_frag_key key;
197 ProtobufCBinaryData *frags;
201struct nmsg_frag_tree {
202 RB_HEAD(frag_ent, nmsg_frag) head;
218 struct _nmsg_ipreasm *reasm;
223 struct bpf_program userbpf;
231 pthread_mutex_t lock;
241 pthread_mutex_t lock;
248struct nmsg_stream_input {
249 nmsg_stream_type type;
250 struct nmsg_buf *buf;
257 struct nmsg_frag_tree nft;
260 struct timespec lastgc;
270 struct nmsg_brate *brate;
271 ISC_LIST(
struct nmsg_seqsrc) seqsrcs;
272 struct sockaddr_storage addr_ss;
276 nmsg_input_stream_read_fp stream_read_fp;
280struct nmsg_stream_output {
281 pthread_mutex_t c_lock;
282 pthread_mutex_t w_lock;
283 nmsg_stream_type type;
290 nmsg_random_t random;
299 uint64_t sequence_id;
303struct nmsg_callback_output {
309struct nmsg_callback_input {
310 nmsg_cb_message_read cb;
316 nmsg_input_type type;
317 nmsg_msgmod_t msgmod;
320 struct nmsg_stream_input *stream;
321 struct nmsg_pcap *pcap;
322 struct nmsg_pres *pres;
323 struct nmsg_json *json;
324 struct nmsg_callback_input *callback;
326 nmsg_input_read_fp read_fp;
327 nmsg_input_read_loop_fp read_loop_fp;
331 unsigned filter_msgtype;
337 nmsg_output_type type;
339 struct nmsg_stream_output *stream;
340 struct nmsg_pres *pres;
341 struct nmsg_json *json;
342 struct nmsg_callback_output *callback;
344 nmsg_output_write_fp write_fp;
345 nmsg_output_flush_fp flush_fp;
349 unsigned filter_msgtype;
356 ProtobufCMessage *message;
357 Nmsg__NmsgPayload *np;
394 ISC_LINK(
struct nmsg_dlmod) link;
399typedef enum nmsg_msgmod_clos_mode {
400 nmsg_msgmod_clos_m_keyval,
401 nmsg_msgmod_clos_m_multiline
402} nmsg_msgmod_clos_mode;
404struct nmsg_msgmod_clos {
407 nmsg_msgmod_clos_mode mode;
408 struct nmsg_msgmod_field *field;
409 struct nmsg_strbuf *strbufs;
413struct nmsg_msgvendor {
414 struct nmsg_msgmod **msgtypes;
420 struct nmsg_msgmod_plugin *plugin;
421 struct nmsg_msgmod_field *fields;
422 struct nmsg_msgmod_field **fields_idx;
426struct nmsg_msgmodset {
427 ISC_LIST(
struct nmsg_dlmod) dlmods;
428 struct nmsg_msgvendor **vendors;
433struct nmsg_strbuf_storage {
434 struct nmsg_strbuf sb;
435 char fixed[DEFAULT_STRBUF_ALLOC_SZ];
442nmsg_res _nmsg_alias_init(
void);
443void _nmsg_alias_fini(
void);
447ssize_t _nmsg_buf_avail(
struct nmsg_buf *buf);
448ssize_t _nmsg_buf_used(
struct nmsg_buf *buf);
449struct nmsg_buf * _nmsg_buf_new(
size_t sz);
450void _nmsg_buf_destroy(
struct nmsg_buf **buf);
451void _nmsg_buf_reset(
struct nmsg_buf *buf);
455struct nmsg_dlmod * _nmsg_dlmod_init(
const char *path);
456void _nmsg_dlmod_destroy(
struct nmsg_dlmod **dlmod);
460struct nmsg_msgmod * _nmsg_msgmod_start(
struct nmsg_msgmod_plugin *plugin);
461void _nmsg_msgmod_stop(
struct nmsg_msgmod **mod);
465nmsg_res _nmsg_message_init_message(
struct nmsg_message *msg);
466nmsg_res _nmsg_message_init_payload(
struct nmsg_message *msg);
467nmsg_res _nmsg_message_deserialize(
struct nmsg_message *msg);
468nmsg_res _nmsg_message_serialize(
struct nmsg_message *msg);
469nmsg_message_t _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
470nmsg_message_t _nmsg_message_dup(
struct nmsg_message *msg);
471nmsg_res _nmsg_message_dup_protobuf(
const struct nmsg_message *msg, ProtobufCMessage **dst);
472nmsg_res _nmsg_message_to_json(nmsg_message_t msg,
struct nmsg_strbuf *sb);
476struct nmsg_msgmodset * _nmsg_msgmodset_init(
const char *path);
477void _nmsg_msgmodset_destroy(
struct nmsg_msgmodset **);
480struct nmsg_strbuf * _nmsg_strbuf_init(
struct nmsg_strbuf_storage *sbs);
481void _nmsg_strbuf_destroy(
struct nmsg_strbuf_storage *sbs);
482nmsg_res _nmsg_strbuf_expand(
struct nmsg_strbuf *sb,
size_t size);
483char * _nmsg_strbuf_detach(
struct nmsg_strbuf *size);
486void _nmsg_payload_free_all(Nmsg__Nmsg *nc);
487void _nmsg_payload_free_crcs(Nmsg__Nmsg *nc);
488void _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
489void _nmsg_payload_free(Nmsg__NmsgPayload **np);
490size_t _nmsg_payload_size(
const Nmsg__NmsgPayload *np);
493nmsg_res _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf,
size_t buf_len);
494void _input_frag_destroy(
struct nmsg_stream_input *);
495void _input_frag_gc(
struct nmsg_stream_input *);
498bool _input_nmsg_filter(nmsg_input_t,
unsigned, Nmsg__NmsgPayload *);
499nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
500nmsg_res _input_nmsg_loop(nmsg_input_t,
int, nmsg_cb_message,
void *);
501nmsg_res _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *,
size_t);
502nmsg_res _input_nmsg_unpack_container2(
const uint8_t *,
size_t,
unsigned, Nmsg__Nmsg **);
503nmsg_res _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
504nmsg_res _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
506nmsg_res _input_nmsg_read_container_zmq(nmsg_input_t, Nmsg__Nmsg **);
508nmsg_res _input_nmsg_deserialize_header(
const uint8_t *,
size_t, ssize_t *,
unsigned *);
511nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
514nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
515nmsg_res _input_nmsg_loop_null(nmsg_input_t,
int, nmsg_cb_message,
void *);
518nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
519nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
522nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
525nmsg_res _input_json_read(nmsg_input_t, nmsg_message_t *);
528struct nmsg_seqsrc * _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
529void _input_seqsrc_destroy(nmsg_input_t);
530size_t _input_seqsrc_update(nmsg_input_t,
struct nmsg_seqsrc *, Nmsg__Nmsg *);
533void _output_stop(nmsg_output_t);
536nmsg_res _output_nmsg_flush(nmsg_output_t);
537nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
540nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
543nmsg_res _output_json_write(nmsg_output_t, nmsg_message_t);
546struct nmsg_brate * _nmsg_brate_init(
size_t target_byte_rate);
547void _nmsg_brate_destroy(
struct nmsg_brate **);
548void _nmsg_brate_sleep(
struct nmsg_brate *,
size_t container_sz,
size_t n_payloads,
size_t n);
597_nmsg_ipdg_parse_reasm(
struct nmsg_ipdg *dg,
unsigned etype,
size_t len,
598 const u_char *pkt,
struct _nmsg_ipreasm *reasm,
599 unsigned *new_len, u_char *new_pkt,
int *defrag,