NATS JetStream Sink
Overview
NATS is a high-performance, lightweight messaging system. NATS JetStream is the built-in streaming layer for NATS that provides durable, replayable message streams with advanced features like message acknowledgment, persistence, and consumer management.
Timeplus provides first-class integration for NATS JetStream as a new type of External Stream. You can read or write data in NATS JetStream using SQL queries, similar to how you work with Kafka or Pulsar external streams.
Create NATS JetStream External Stream
Use the following SQL command to create a NATS JetStream external stream:
CREATE EXTERNAL STREAM [IF NOT EXISTS] <stream_name>
(<col_name1> <col_type>)
SETTINGS
type='nats_jetstream', -- required
url='nats://host:port', -- required
stream_name='..', -- required
subject='..', -- required
consumer_stall_timeout_ms=..,
username='..',
password='..',
token='..',
secure=<true|false>,
ssl_ca_cert_file='..',
skip_ssl_cert_check=<true|false>,
ssl_cert_file='..',
ssl_key_file='..',
data_format='..',
format_schema='..',
one_message_per_row=..,
config_file='..',
named_collection='..';
Settings
type
Must be set to nats_jetstream.
url
The NATS server URL.
Example: nats://localhost:4222
stream_name
The name of the JetStream stream to connect to. The stream must exist on the NATS server before creating the external stream. Timeplus validates the stream exists during creation.
subject
The NATS subject to subscribe or publish messages. Wild cards * and > are supported.
For inserts, messages are published to this subject unless overwritten by the _nats_subject column.
consumer_stall_timeout_ms
Stall detection timeout in milliseconds. If no progress is made for this duration, Timeplus will recreate the subscription to recover from potential stalls.
Default: 60000
Authentication Settings
Timeplus supports multiple authentication mechanisms for NATS. Only one method can be used at a time.
username / password
Username and password authentication.
token
Token-based authentication.
TLS Settings
secure
Set 'true' to use a secure (SSL/TLS) connection.
ssl_ca_cert_file
Path to the CA certificate file for TLS verification.
skip_ssl_cert_check
Set 'true' to skip server certificate verification.
This is fine for tests but use with caution since this is not secure.
ssl_cert_file / ssl_key_file
For mTLS (mutual TLS), provide both the client certificate and private key files path. Both must be specified together.
The certificates must be in PEM format and must be sorted starting with the subject's certificate, followed by intermediate CA certificates if applicable, and ending at the highest level (root) CA.
The private key file format supported is also PEM.
Data Format Settings
data_format
Defines how NATS messages are parsed and written.
Common formats include:
| Format | Description |
|---|---|
RawBLOB | Raw text, no parsing |
JSONEachRow | One JSON document per line |
CSV | Comma-separated values |
TSV | Tab-separated values |
ProtobufSingle | One Protobuf message per NATS message |
Protobuf | Multiple Protobuf messages per NATS message |
Avro | Avro-encoded messages |
format_schema
Required for ProtobufSingle, Protobuf, and Avro formats. Defines the schema for message serialization.
one_message_per_row
Set to true to ensure each NATS message maps to exactly one JSON document, especially when writing with JSONEachRow.
When _tp_message_headers column is defined, one_message_per_row must be true and will be automatically set.
Other Settings
config_file
Path to a configuration file containing key-value pairs. Useful for managing credentials securely, especially in Kubernetes environments with secrets managed via HashiCorp Vault.
Example config file:
username=my_username
password=my_password
data_format=JSONEachRow
one_message_per_row=true
named_collection
Named Collections allow you to group shared configuration settings into a reusable object. This simplifies DDL and enhances security by masking sensitive information.
Example:
CREATE NAMED COLLECTION nats_nc AS
url='nats://localhost:4222',
username='admin_user',
password='admin';
CREATE EXTERNAL STREAM test_nats_es(raw string)
SETTINGS
type='nats_jetstream',
stream_name='my_stream',
subject='my.subject',
named_collection='nats_nc';
For more details, refer to Named Collection documentation.
Write Data to NATS JetStream
Timeplus supports writing data to NATS JetStream using various encoding formats such as strings, JSON, CSV, TSV, Avro, and Protobuf. You can write to NATS JetStream using SQL INSERT statements, the Ingest REST API, or as the target of a Materialized View.
Write as Raw String
You can encode data as a raw string in NATS messages:
CREATE EXTERNAL STREAM ext_github_events (raw string)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='github_events',
subject='github.events'
You can then write data via:
INSERT INTO ext_github_events VALUES ('some string')- Ingest REST API
- Materialized View
Internally, the data_format is RawBLOB, and one_message_per_row=true by default.
Write as JSONEachRow
Encode each row as a separate JSON object (JSONL format):
CREATE EXTERNAL STREAM target(
url string,
method string,
ip string)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='events_stream',
subject='masked.events',
data_format='JSONEachRow',
one_message_per_row=true;
The messages will be published to the JetStream subject 'masked.events' as:
{
"url":"https://www.example.io/methodologies/killer/web-readiness",
"method":"POST",
"ip":"c4ecf59a9ec27b50af9cc3bb8289e16c"
}
By default, multiple JSON documents may be inserted into the same NATS message when using JSONEachRow. Set one_message_per_row=true to ensure each NATS message contains exactly one JSON document.
Write as CSV / TSV
Each row is encoded as one CSV/TSV line.
Write as Protobuf / Avro
To write Protobuf-encoded or Avro-encoded messages, please refer to Schema documentation.
Specify Subject with _nats_subject
By default, the message is published to subject defined in the external stream setting 'subject'.
You can use the _nats_subject column to route messages to different NATS subjects dynamically, overriding the default subject setting:
CREATE EXTERNAL STREAM foo (
id int32,
name string,
_nats_subject string
) SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='multi_subject_stream',
subject='users.*';
Insert data with different subjects:
INSERT INTO foo(id, name, _nats_subject) VALUES (1, 'John', 'users.john');
INSERT INTO foo(id, name, _nats_subject) VALUES (2, 'Jane', 'users.jane');
Messages will be published to subjects users.john and users.jane correspondingly.
The JetStream stream must have subjects configured to accept messages on these dynamic subjects. Ensure the stream's subject filter matches all possible _nats_subject values.
_nats_subject can be nullable - when null or empty, messages are published to the default subject.
Write NATS Message Headers
Add NATS headers via _tp_message_headers (map of key-value pairs):
CREATE EXTERNAL STREAM example (
s string,
i int,
_tp_message_headers map(string, string)
) SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='headers_stream',
subject='headers.data';
Insert data with headers:
INSERT INTO example(s, i, _tp_message_headers) VALUES ('data', 42, map('trace_id', 'abc123', 'content-type', 'application/json'));