Skip to main content

NATS JetStream Sink

Overview

NATS is a high-performance, lightweight messaging system. NATS JetStream is the built-in streaming layer for NATS that provides durable, replayable message streams with advanced features like message acknowledgment, persistence, and consumer management.

Timeplus provides first-class integration for NATS JetStream as a new type of External Stream. You can read or write data in NATS JetStream using SQL queries, similar to how you work with Kafka or Pulsar external streams.

Create NATS JetStream External Stream

Use the following SQL command to create a NATS JetStream external stream:

CREATE EXTERNAL STREAM [IF NOT EXISTS] <stream_name>
(<col_name1> <col_type>)
SETTINGS
type='nats_jetstream', -- required
url='nats://host:port', -- required
stream_name='..', -- required
subject='..', -- required
consumer_stall_timeout_ms=..,
username='..',
password='..',
token='..',
secure=<true|false>,
ssl_ca_cert_file='..',
skip_ssl_cert_check=<true|false>,
ssl_cert_file='..',
ssl_key_file='..',
data_format='..',
format_schema='..',
one_message_per_row=..,
config_file='..',
named_collection='..';

Settings

type

Must be set to nats_jetstream.

url

The NATS server URL. Example: nats://localhost:4222

stream_name

The name of the JetStream stream to connect to. The stream must exist on the NATS server before creating the external stream. Timeplus validates the stream exists during creation.

subject

The NATS subject to subscribe or publish messages. Wild cards * and > are supported.

For inserts, messages are published to this subject unless overwritten by the _nats_subject column.

consumer_stall_timeout_ms

Stall detection timeout in milliseconds. If no progress is made for this duration, Timeplus will recreate the subscription to recover from potential stalls.

Default: 60000

Authentication Settings

Timeplus supports multiple authentication mechanisms for NATS. Only one method can be used at a time.

username / password

Username and password authentication.

token

Token-based authentication.

TLS Settings

secure

Set 'true' to use a secure (SSL/TLS) connection.

ssl_ca_cert_file

Path to the CA certificate file for TLS verification.

skip_ssl_cert_check

Set 'true' to skip server certificate verification.

warning

This is fine for tests but use with caution since this is not secure.

ssl_cert_file / ssl_key_file

For mTLS (mutual TLS), provide both the client certificate and private key files path. Both must be specified together.

The certificates must be in PEM format and must be sorted starting with the subject's certificate, followed by intermediate CA certificates if applicable, and ending at the highest level (root) CA.

The private key file format supported is also PEM.

Data Format Settings

data_format

Defines how NATS messages are parsed and written.

Common formats include:

FormatDescription
RawBLOBRaw text, no parsing
JSONEachRowOne JSON document per line
CSVComma-separated values
TSVTab-separated values
ProtobufSingleOne Protobuf message per NATS message
ProtobufMultiple Protobuf messages per NATS message
AvroAvro-encoded messages

format_schema

Required for ProtobufSingle, Protobuf, and Avro formats. Defines the schema for message serialization.

one_message_per_row

Set to true to ensure each NATS message maps to exactly one JSON document, especially when writing with JSONEachRow.

info

When _tp_message_headers column is defined, one_message_per_row must be true and will be automatically set.

Other Settings

config_file

Path to a configuration file containing key-value pairs. Useful for managing credentials securely, especially in Kubernetes environments with secrets managed via HashiCorp Vault.

Example config file:

username=my_username
password=my_password
data_format=JSONEachRow
one_message_per_row=true

named_collection

Named Collections allow you to group shared configuration settings into a reusable object. This simplifies DDL and enhances security by masking sensitive information.

Example:

CREATE NAMED COLLECTION nats_nc AS
url='nats://localhost:4222',
username='admin_user',
password='admin';

CREATE EXTERNAL STREAM test_nats_es(raw string)
SETTINGS
type='nats_jetstream',
stream_name='my_stream',
subject='my.subject',
named_collection='nats_nc';

For more details, refer to Named Collection documentation.

Write Data to NATS JetStream

Timeplus supports writing data to NATS JetStream using various encoding formats such as strings, JSON, CSV, TSV, Avro, and Protobuf. You can write to NATS JetStream using SQL INSERT statements, the Ingest REST API, or as the target of a Materialized View.

Write as Raw String

You can encode data as a raw string in NATS messages:

CREATE EXTERNAL STREAM ext_github_events (raw string)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='github_events',
subject='github.events'

You can then write data via:

  • INSERT INTO ext_github_events VALUES ('some string')
  • Ingest REST API
  • Materialized View
info

Internally, the data_format is RawBLOB, and one_message_per_row=true by default.

Write as JSONEachRow

Encode each row as a separate JSON object (JSONL format):

CREATE EXTERNAL STREAM target(
url string,
method string,
ip string)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='events_stream',
subject='masked.events',
data_format='JSONEachRow',
one_message_per_row=true;

The messages will be published to the JetStream subject 'masked.events' as:

{
"url":"https://www.example.io/methodologies/killer/web-readiness",
"method":"POST",
"ip":"c4ecf59a9ec27b50af9cc3bb8289e16c"
}
info

By default, multiple JSON documents may be inserted into the same NATS message when using JSONEachRow. Set one_message_per_row=true to ensure each NATS message contains exactly one JSON document.

Write as CSV / TSV

Each row is encoded as one CSV/TSV line.

Write as Protobuf / Avro

To write Protobuf-encoded or Avro-encoded messages, please refer to Schema documentation.

Specify Subject with _nats_subject

By default, the message is published to subject defined in the external stream setting 'subject'.

You can use the _nats_subject column to route messages to different NATS subjects dynamically, overriding the default subject setting:

CREATE EXTERNAL STREAM foo (
id int32,
name string,
_nats_subject string
) SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='multi_subject_stream',
subject='users.*';

Insert data with different subjects:

INSERT INTO foo(id, name, _nats_subject) VALUES (1, 'John', 'users.john');
INSERT INTO foo(id, name, _nats_subject) VALUES (2, 'Jane', 'users.jane');

Messages will be published to subjects users.john and users.jane correspondingly.

info

The JetStream stream must have subjects configured to accept messages on these dynamic subjects. Ensure the stream's subject filter matches all possible _nats_subject values.

_nats_subject can be nullable - when null or empty, messages are published to the default subject.

Write NATS Message Headers

Add NATS headers via _tp_message_headers (map of key-value pairs):

CREATE EXTERNAL STREAM example (
s string,
i int,
_tp_message_headers map(string, string)
) SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='headers_stream',
subject='headers.data';

Insert data with headers:

INSERT INTO example(s, i, _tp_message_headers) VALUES ('data', 42, map('trace_id', 'abc123', 'content-type', 'application/json'));