NATS и потоковая передача NATS — клиент Cи

https://github.com/nats-io/nats.c

Клиент AC для системы обмена сообщениями NATS .

Перейдите сюда для онлайн-документации, и проверьте часто задаваемые вопросы .

Эта реализация клиента NATS во многом основана на клиенте NATS GO . Существует поддержка Mac OS/X, Linux и Windows (хотя у нас нет конкретной матрицы поддержки платформ).

Лицензия Апач 2 Статус сборки Статус покрытия Выпуск Документация Всего предупреждений Языковой уровень: C/C++

Оглавление

Установка

Доступно несколько менеджеров пакетов с клиентской библиотекой NATS C. Если вы знаете что-то, чего нет в этом списке, отправьте PR, чтобы добавить его!

Строительство

Сначала загрузите исходный код:

git clone git@github.com:nats-io/nats.c.git .

Для сборки библиотеки используйте CMake . Обратите внимание, что по умолчанию NATS Streaming API будет создан и включен в библиотеку NATS. См. ниже, если вы не хотите создавать API-интерфейсы, связанные с потоковой передачей.

Убедитесь, что CMake добавлен к вашему пути. При сборке в Windows откройте командную оболочку из меню Инструменты Visual Studio и выберите соответствующую командную оболочку (x64 или x86 для 64- или 32-разрядных сборок соответственно). Вам также, вероятно, потребуется запустить это с правами администратора.

Создать buildкаталог (подойдет любое имя) из корневого исходного дерева и cdвнутрь. Затем введите эту команду в первый раз:

cmake ..

В некоторых архитектурах вы можете столкнуться с ошибкой компиляции для mutex.c.oпотому что нет поддержки для ассемблерной инструкции, которую мы используем для выхода при вращении, пытающемся получить блокировку.

Вы можете получить такую ​​ошибку сборки:

/tmp/cc1Yp7sD.s: Assembler messages:
/tmp/cc1Yp7sD.s:302: Error: selected processor does not support ARM mode `yield'
src/CMakeFiles/nats_static.dir/build.make:542: recipe for target 'src/CMakeFiles/nats_static.dir/unix/mutex.c.o' failed

Если это так, вы можете решить эту проблему, включив NATS_BUILD_NO_SPINфлаг (или используйте -DNATS_NO_SPINесли вы компилируете без CMake):

cmake .. -DNATS_BUILD_NO_SPIN=ON

Если вы ранее создали библиотеку, вам может потребоваться выполнить make cleanили просто удалите и заново создайте каталог сборки перед выполнением команды cmake.

Для сборки в Windows вам нужно будет выбрать генератор сборки. Например, для выбора nmake, вы бы запустили:

cmake .. -G "NMake Makefiles"

Бег cmake -hдаст вам список возможных вариантов и все имена генераторов.

Кроме того, вы можете запустить версию с графическим интерфейсом. Из той же сборки командной оболочки

c:\program files (x86)\CMake\bin\cmake-gui.exe

Если вы начали с пустого каталога сборки, вам нужно будет выбрать исходный каталог и каталог сборки, а затем нажать Configure. Здесь вы сможете выбрать из раскрывающегося списка имя генератора сборки. Когда закончите, нажмите Generate. Затем вы можете вернуться в свою командную оболочку или Visual Studio и выполнить сборку.

Чтобы изменить некоторые параметры сборки, вам нужно отредактировать кеш и пересобрать.

make edit_cache

Обратите внимание, что если вы строите в Windows и выбрали «NMake Makefiles», замените все следующие ссылки на makeс участием nmake.

Редактирование кеша позволяет выбрать тип сборки (отладка, выпуск и т. д.), архитектуру (64 или 32 бит) и т. д.

Целевой объект по умолчанию будет собирать все, то есть статические и общие библиотеки NATS, а также примеры и тестовую программу. Каждый из них находится в своих соответствующих каталогах в вашем каталоге сборки: src, examplesа также test.

make install

Скопирует как статические, так и общие библиотеки в папку install/libи общедоступные заголовки в install/include.

Поддержка TLS

По умолчанию библиотека построена с поддержкой TLS. Вы можете отключить это из cmake gui make edit_cacheи переключите NATS_BUILD_WITH_TLSвозможность OFF, или передать параметр непосредственно в cmakeкоманда:

cmake .. -DNATS_BUILD_WITH_TLS=OFF

Начиная 2.0.0, при сборке с поддержкой TLS/SSL ожидаемое имя хоста сертификата сервера всегда проверяется. Это означает, что имя хоста, указанное в URL-адресах или через опцию natsOptions_SetExpectedHostname()будет использоваться для проверки имени хоста, присутствующего в сертификате. До 2.0.0, имя хоста будет проверено, только если опция natsOptions_SetExpectedHostname()был вызван.

Хотя мы рекомендуем оставить новое поведение по умолчанию, вы можете восстановить предыдущее поведение, создав библиотеку с выключенным параметром:

cmake .. -DNATS_BUILD_TLS_FORCE_HOST_VERIFY=OFF

Клиент NATS C создан с использованием API из OpenSSL библиотеки По умолчанию мы используем 1.0.2API. Вы можете скомпилировать клиент NATS C с версией OpenSSL API. 1.1+. Для этого нужно включить NATS_BUILD_TLS_USE_OPENSSL_1_1_APIвариант:

cmake .. -DNATS_BUILD_TLS_USE_OPENSSL_1_1_API=ON

Поскольку клиент NATS C динамически связывается с библиотекой OpenSSL, вам необходимо убедиться, что вы затем запускаете свое приложение с библиотекой OpenSSL 1.1+.

Обратите внимание, что вариант NATS_BUILD_WITH_TLS_CLIENT_METHODне рекомендуется. Его цель состояла в том, чтобы клиент NATS C использовал метод, представленный в OpenSSL. 1.1+. Новый вариант NATS_BUILD_TLS_USE_OPENSSL_1_1_APIявляется более общим и заменяет NATS_BUILD_WITH_TLS_CLIENT_METHOD. Если вы используете сценарии для автоматизации процесса сборки, использующего NATS_BUILD_WITH_TLS_CLIENT_METHOD, они по-прежнему будут работать, и использование этого устаревшего параметра будет иметь тот же эффект, что и установка NATS_BUILD_TLS_USE_OPENSSL_1_1_APIк ON.

Статическая ссылка

Если вы хотите связать статическую библиотеку OpenSSL, вам нужно удалить CMakeCache.txtи регенерируйте его с дополнительной опцией:

rm CMakeCache.txt
cmake .. <build options that you may already use> -DNATS_BUILD_OPENSSL_STATIC_LIBS=ON

Тогда позвони make(или эквивалент в зависимости от вашей платформы), и это должно гарантировать, что библиотека (и примеры и/или исполняемый набор тестов) связаны с библиотекой OpenSSL, если она была найдена CMake.

Строительство с потоковой передачей

При сборке библиотеки с поддержкой потоковой передачи библиотека NATS использует библиотеку libprotobuf-c . Когда cmake запускается в первый раз (или после удаления CMakeCache.txtи звоню cmake ..снова), он ищет библиотеку libprotobuf-c. Если он не находит его, печатается сообщение, и процесс сборки завершается сбоем. CMake ищет библиотеку в каталогах, где обычно находятся библиотеки. Однако, если вы хотите указать конкретный каталог, в котором находится библиотека, вам нужно сделать это:

cmake .. -DNATS_PROTOBUF_DIR=<my libprotobuf-c directory>

Статическая библиотека будет использоваться по умолчанию. Если вы хотите изменить это, или если у библиотеки не ожидаемое имя, вам нужно сделать это:

# Use the library named mylibproto.so located at /my/location
cmake .. -DNATS_PROTOBUF_LIBRARY=/my/location/mylibproto.so

Их можно объединить, если заголовок include находится в другом каталоге.

# Use the library named mylibproto.so located at /my/location and the directory protobuf-c/ containing protobuf-c.h located at /my/other/location
cmake .. -DNATS_PROTOBUF_LIBRARY=/my/location/mylibproto.so -DNATS_PROTOBUF_DIR=/my/other/location

Если вы не хотите создавать API потоковой передачи NATS для включения в библиотеку NATS:

cmake .. -DNATS_BUILD_STREAMING=OFF

Строительство с Libsodium

При использовании новых функций безопасности NATS 2.0 библиотеке необходимо подписать некоторый «одноразовый номер», отправленный сервером во время подключения или повторного подключения. Мы используем Ed25519 с открытым ключом Библиотека поставляется с некоторым кодом для выполнения подписи. В большинстве случаев все будет хорошо, но если производительность является проблемой (особенно если вы планируете использовать natsConnection_Sign()много функций), у вас будет возможность сборки с Libsodium .

Следуйте инструкциям по установке библиотеки libsodium здесь .

В macOS вы можете использовать brew:

brew install libsodium

В Linux вы можете использовать apt-get

apt-get install libsodium-dev

После установки вы можете перестроить клиент NATS C, сначала включив использование библиотеки libsodium:

cmake .. -DNATS_BUILD_USE_SODIUM=ON

Если у вас установлена ​​библиотека libsodium в нестандартном месте, которое CMake не может найти, вы можете указать расположение этого каталога:

cmake .. -DNATS_BUILD_USE_SODIUM=ON -DNATS_SODIUM_DIR=/my/path/to/libsodium

Тестирование

На платформах, где valgrindдоступен, вы можете запускать тесты с проверкой памяти. Вот пример:

make test ARGS="-T memcheck"

Или вы можете вызвать непосредственно ctestпрограмма:

ctest -T memcheck -V -I 1,4

Приведенная выше команда запустит тесты с valgrind ( -T memcheck), с подробным выводом ( -V) и запустить тесты с 1 по 4 ( -I 1,4).

Если вы добавите тест в test/test.c, вам нужно добавить его в allTestsмножество. Каждая запись содержит имя и тестовую функцию. Вы можете добавить его в любом месте этого массива. Создайте свои изменения:

make
[ 44%] Built target nats
[ 88%] Built target nats_static
[ 90%] Built target nats-publisher
[ 92%] Built target nats-queuegroup
[ 94%] Built target nats-replier
[ 96%] Built target nats-requestor
[ 98%] Built target nats-subscriber
Scanning dependencies of target testsuite
[100%] Building C object test/CMakeFiles/testsuite.dir/test.c.o
Linking C executable testsuite
[100%] Built target testsuite

Теперь восстановите список, вызвав набор тестов без каких-либо аргументов:

./test/testsuite
Number of tests: 77

В этом списке указано количество тестов, добавленных в файл. list.txt. Переместите этот файл в тестовый каталог источника.

mv list.txt ../test/

Затем обновите сборку:

cmake ..
-- Configuring done
-- Generating done
-- Build files have been written to: /home/ivan/nats.c/build

Вы можете использовать следующие переменные среды, чтобы влиять на поведение набора тестов.

При работе с проверкой памяти время изменяется, и общая производительность снижается. Следующая переменная позволяет набору тестов настроить некоторые значения, используемые во время теста:

export NATS_TEST_VALGRIND=yes

В Windows это будет setвместо export.

При запуске тестов в подробном режиме следующая переменная среды позволяет вам видеть вывод сервера из самого теста. Без этой опции вывод сервера отключен:

export NATS_TEST_KEEP_SERVER_OUTPUT=yes

Если вы хотите изменить имя исполняемого файла сервера по умолчанию ( nats-server.exe) или укажите конкретное местоположение, используйте эту переменную среды:

set NATS_TEST_SERVER_EXE=c:\test\nats-server.exe

Документация

Публичный API был задокументирован с помощью Doxygen .

Чтобы сгенерировать документацию, перейдите на docкаталог и введите следующую команду:

doxygen DoxyFile.NATS.Client

Если вы переключаете сборку Streaming API, а документация больше не соответствует что строится, вы можете обновить документацию, переключив NATS_UPDATE_DOCпостроить флаг и перестроить документацию.

Из каталога сборки:

cmake .. -DNATS_UPDATE_DOC=ON
make
cd <nats.c root dir>/doc
doxygen DoxyFile.NATS.Client

Сгенерированная документация будет находиться в htmlкаталог. Чтобы просмотреть документацию, укажите в браузере файл index.htmlв этом каталоге.

Перейдите сюда для онлайн-документации.

Исходный код также достаточно документирован.

NATS-клиент

Важные изменения

В этом разделе перечислены важные изменения, такие как уведомления об устаревании и т. д.

Версия 2.0.0

В этой версии представлены концепции безопасности, используемые сервером NATS. 2.0.0и, следовательно, соответствует версии сервера. Были введены новые API, но наиболее важным изменением является новое поведение по умолчанию для соединений TLS:

  • При установлении безопасного соединения имя хоста сертификата сервера теперь всегда проверяется, независимо от того, вызывал ли пользователь natsOptions_SetExpectedHostname(). Это может нарушить работу приложений, которые, например, использовали IP-адрес для подключения к серверу, у которого в сертификате было только имя хоста. Это можно решить, изменив ваше приложение, чтобы использовать имя хоста в URL-адресе или использовать natsOptions_SetExpectedHostname(). Если это невозможно, вы можете восстановить старое поведение, создав библиотеку с отключенным новым поведением. Дополнительные сведения см. в разделе #tls-support.

  • Этот репозиторий раньше включал предварительно скомпилированные библиотеки libprotobuf-c для macOS, Linux и Windows вместе с файлами заголовков (в /pbufкаталог). Теперь мы удалили этот каталог и требуем, чтобы пользователь отдельно установил библиотеку libprotobuf-c. См. инструкции по сборке, чтобы указать расположение библиотеки, если CMake не может найти ее напрямую.

Версия 1.8.0

  • В natsConnStatusзначения перечисления имеют префикс NATS_CONN_STATUS_. Если ваше приложение не используя ссылку на какое-либо исходное значение, например CONNECTEDили CLOSED, и т.д.. то нечего что вам нужно сделать. Если вы это сделаете, у вас есть два варианта:
    • Замените все ссылки из исходных значений на новое значение (добавляя префикс)
    • Скомпилируйте библиотеку с возможностью использования исходных значений (без префикса). Отредактируйте кеш CMake и включите опцию NATS_BUILD_NO_PREFIX_CONNSTS. Это можно сделать следующим образом из каталога сборки: cmake .. -DNATS_BUILD_NO_PREFIX_CONNSTS=ON

Начиная

В examples/getstartedВ каталоге есть набор простых примеров, которые полностью функциональны, но очень просты. Цель состоит в том, чтобы продемонстрировать, насколько прост в использовании API.

Более сложный набор примеров находится в examples/каталоге, а также может использоваться для тестирования клиентской библиотеки.

Основное использование

Обратите внимание, что для простоты проверка ошибок здесь не выполняется.

natsConnection      *nc  = NULL;
natsSubscription    *sub = NULL;
natsMsg             *msg = NULL;

// Connects to the default NATS Server running locally
natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);

// Connects to a server with username and password
natsConnection_ConnectTo(&nc, "nats://ivan:secret@localhost:4222");

// Connects to a server with token authentication
natsConnection_ConnectTo(&nc, "nats://myTopSecretAuthenticationToken@localhost:4222");

// Simple publisher, sending the given string to subject "foo"
natsConnection_PublishString(nc, "foo", "hello world");

// Publish binary data. Content is not interpreted as a string.
char data[] = {1, 2, 0, 4, 5};
natsConnection_Publish(nc, "foo", (const void*) data, 5);

// Simple asynchronous subscriber on subject foo, invoking message
// handler 'onMsg' when messages are received, and not providing a closure.
natsConnection_Subscribe(&sub, nc, "foo", onMsg, NULL);

// Simple synchronous subscriber
natsConnection_SubscribeSync(&sub, nc, "foo");

// Using a synchronous subscriber, gets the first message available, waiting
// up to 1000 milliseconds (1 second)
natsSubscription_NextMsg(&msg, sub, 1000);

// Destroy any message received (asynchronously or synchronously) or created
// by your application. Note that if 'msg' is NULL, the call has no effect.
natsMsg_Destroy(msg);

// Unsubscribing
natsSubscription_Unsubscribe(sub);

// Destroying the subscription (this will release the object, which may
// result in freeing the memory). After this call, the object must no
// longer be used.
natsSubscription_Destroy(sub);

// Publish requests to the given reply subject:
natsConnection_PublishRequestString(nc, "foo", "bar", "help!");

// Sends a request (internally creates an inbox) and Auto-Unsubscribe the
// internal subscriber, which means that the subscriber is unsubscribed
// when receiving the first response from potentially many repliers.
// This call will wait for the reply for up to 1000 milliseconds (1 second).
natsConnection_RequestString(&reply, nc, "foo", "help", 1000);

// Closing a connection (but not releasing the connection object)
natsConnection_Close(nc);

// When done with the object, free the memory. Note that this call
// closes the connection first, in other words, you could have simply
// this call instead of natsConnection_Close() followed by the destroy
// call.
natsConnection_Destroy(nc);

// Message handler
void
onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
    // Prints the message, using the message getters:
    printf("Received msg: %s - %.*s\n",
        natsMsg_GetSubject(msg),
        natsMsg_GetDataLength(msg),
        natsMsg_GetData(msg));

    // Don't forget to destroy the message!
    natsMsg_Destroy(msg);
}

ДжетСтрим

Поддержка JetStream начинается с версии v3.0.0библиотеки и сервера NATS v2.2.0+, хотя получение JetStream для конкретных кодов ошибок требуется версия сервера v2.3.0+. Некоторые параметры конфигурации доступны только при запуске v2.3.3, поэтому мы рекомендуем вам использовать последнюю версию сервера NATS, чтобы получить лучший опыт.

Посмотрите на примеры, названные js-xxx.cв examplesКаталог с примерами использования API. Новые объекты и API полностью задокументированы в электронной документации .

Основное использование JetStream

// Connect to NATS
natsConnection_Connect(&conn, opts);

// Initialize and set some JetStream options
jsOptions jsOpts;
jsOptions_Init(&jsOpts);
jsOpts.PublishAsync.MaxPending = 256;

// Create JetStream Context
natsConnection_JetStream(&js, conn, &jsOpts);

// Simple Stream Publisher
js_Publish(&pa, js, "ORDERS.scratch", (const void*) "hello", 5, NULL, &jerr);

// Simple Async Stream Publisher
for (i=0; i < 500; i++)
{
    js_PublishAsync(js, "ORDERS.scratch", (const void*) "hello", 5, NULL);
}

// Wait for up to 5 seconds to receive all publish acknowledgments.
jsPubOptions_Init(&jsPubOpts);
jsPubOpts.MaxWait = 5000;
js_PublishAsyncComplete(js, &jsPubOpts);

// One can get the list of all pending publish async messages,
// to either resend them or simply destroy them.
natsMsgList pending;
s = js_PublishAsyncGetPendingList(&pending, js);
if (s == NATS_OK)
{
    int i;

    for (i=0; i<pending.Count; i++)
    {

        // There could be a decision to resend these messages or not.
        if (your_decision_to_resend(pending.Msgs[i]))
        {

            // If the call is successful, pending.Msgs[i] will be set
            // to NULL so destroying the pending list will not destroy
            // this message since the library has taken ownership back.
            js_PublishMsgAsync(js, &(pending.Msgs[i]), NULL);
        }
    }

    // Destroy the pending list object and all messages still in that list.
    natsMsgList_Destroy(&pending);
}

// To create an asynchronous ephemeral consumer
js_Subscribe(&sub, js, "foo", myMsgHandler, myClosure, &jsOpts, NULL, &jerr);

// Same but use a subscription option to ask the callback to not do auto-ack.
jsSubOptions so;
jsSubOptions_Init(&so);
so.ManualAck = true;
js_Subscribe(&sub, js, "foo", myMsgHandler, myClosure, &jsOpts, &so, &jerr);

// Or to bind to an existing specific stream/durable:
jsSubOptions_Init(&so);
so.Stream = "MY_STREAM";
so.Consumer = "my_durable";
js_Subscribe(&sub, js, "foo", myMsgHandler, myClosure, &jsOpts, &so, &jerr);

// Synchronous subscription:
js_SubscribeSync(&sub, js, "foo", &jsOpts, &so, &jerr);

Базовое управление JetStream

jsStreamConfig  cfg;

// Connect to NATS
natsConnection_Connect(&conn, opts);

// Create JetStream Context
natsConnection_JetStream(&js, conn, NULL);

// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
// Provide a name
cfg.Name = "ORDERS";
// Array of subjects and its size
cfg.Subjects = (const char*[1]){"ORDERS.*"};
cfg.SubjectsLen = 1;

// Create a Stream. If you are not interested in the returned jsStreamInfo object,
// you can pass NULL.
js_AddStream(NULL, js, &cfg, NULL, &jerr);

// Update a Stream
cfg.MaxBytes = 8;
js_UpdateStream(NULL, js, &cfg, NULL, &jerr);

// Delete a Stream
js_DeleteStream(js, "ORDERS", NULL, &jerr);

ключ-значение

ЭКСПЕРИМЕНТАЛЬНАЯ ФУНКЦИЯ! Мы оставляем за собой право изменять API без обязательного изменения основной версии библиотеки.

Хранилище KeyValue — это материализованное представление, основанное на JetStream. Сегмент – это поток, а ключи – это объекты в этом потоке.

Для некоторых функций требуется сервер NATS. v2.6.2, поэтому мы рекомендуем вам использовать последнюю версию сервера NATS, чтобы получить больше удовольствия.

Новые объекты и API полностью задокументированы в электронной документации .

Управление ключевым значением

Пример создания хранилища KeyValue:

jsCtx       *js = NULL;
kvStore     *kv = NULL;
kvConfig    kvc;

// Assume we got a JetStream context in `js`...

kvConfig_Init(&kvc);
kvc.Bucket = "KVS";
kvc.History = 10;
s = js_CreateKeyValue(&kv, js, &kvc);

// Do some stuff...

// This is to free the memory used by `kv` object,
// not delete the KeyValue store in the server
kvStore_Destroy(kv);

Это показывает, как «привязать» к существующему:

jsCtx       *js = NULL;
kvStore     *kv = NULL;

// Assume we got a JetStream context in `js`...

s = js_KeyValue(&kv, ks, "KVS");

// Do some stuff...

// This is to free the memory used by `kv` object,
// not delete the KeyValue store in the server
kvStore_Destroy(kv);

Вот как удалить хранилище KeyValue на сервере:

jsCtx       *js = NULL;

// Assume we got a JetStream context in `js`...

s = js_DeleteKeyValue(js, "KVS");

API ключ-значение

Вот как указать значение для данного ключа:

kvStore     *kv = NULL;
uint64_t    rev = 0;

// Assume we got a kvStore...

s = kvStore_PutString(&rev, kv, "MY_KEY", "my value");

// If the one does not care about getting the revision, pass NULL:
s = kvStore_PutString(NULL, kv, "MY_KEY", "my value");

Вышеприведенное помещает значение для данного ключа, но если вместо этого кто-то хочет убедиться, что значение помещается для ключа, только если он никогда не существовал раньше, можно было бы вызвать:

// Same note than before: if "rev" is not needed, pass NULL:
s = kvStore_CreateString(&rev, kv, "MY_KEY", "my value");

Можно обновить ключ тогда и только тогда, когда последняя версия на сервере совпадает с той, которая была передана в этот API:

// This would update the key "MY_KEY" with the value "my updated value" only if the current revision (sequence number) for this key is 10.
s = kvStore_UpdateString(&rev, kv, "MY_KEY", "my updated value", 10);

Вот как получить ключ:

kvStore *kv = NULL;
kvEntry *e  = NULL;

// Assume we got a kvStore...

s = kvStore_Get(&e, kv, "MY_KEY");

// Then we can get some fields from the entry:
printf("Key value: %s\n", kvEntry_ValueString(e));

// Once done with the entry, we need to destroy it to release memory.
// This is NOT deleting the key from the server.
kvEntry_Destroy(e);

Вот как очистить ключ:

kvStore *kv = NULL;

// Assume we got a kvStore...

s = kvStore_Purge(kv, "MY_KEY");

Это удалит ключ на сервере:

kvStore *kv = NULL;

// Assume we got a kvStore...

s = kvStore_Delete(kv, "MY_KEY");

Чтобы создать «наблюдатель» для данного ключа:

kvWatcher       *w = NULL;
kvWatchOptions  o;

// Assume we got a kvStore...

// Say that we are not interested in getting the
// delete markers...

// Initialize a kvWatchOptions object:
kvWatchOptions_Init(&o);
o.IgnoreDeletes = true;
// Create the watcher
s = kvStore_Watch(&w, kv, "foo.*", &o);
// Check for error..

// Now get updates:
while (some_condition)
{
    kvEntry *e = NULL;

    // Wait for the next update for up to 5 seconds
    s = kvWatcher_Next(&e, w, 5000);

    // Do something with the entry...

    // Destroy to release memory
    kvEntry_Destroy(e);
}

// When done with the watcher, it needs to be destroyed to release memory:
kvWatcher_Destroy(w);

Чтобы получить историю ключа:

kvEntryList l;
int         i;

// The list is defined on the stack and will be initilized/updated by this call:
s = kvStore_History(&l, kv, "MY_KEY", NULL);

for (i=0; i<l.Count; i++)
{
    kvEntry *e = l.Entries[i];

    // Do something with the entry...
}
// When done with the list, call this to free entries and the content of the list.
kvEntryList_Destroy(&l);

// In order to set a timeout to get the history, we need to do so through options:
kvWatchOptions o;

kvWatchOptions_Init(&o);
o.Timeout = 5000; // 5 seconds.
s = kvStore_History(&l, kv, "MY_KEY", &o);

Вот как вы получите ключи от ведра:

kvKeysList  l;
int         i;

// If no option is required, pass NULL as the last argument.
s = kvStore_Keys(&l, kv, NULL);
// Check error..

// Go over all keys:
for (i=0; i<l.Count; i++)
    printf("Key: %s\n", l.Keys[i]);

// When done, list need to be destroyed.
kvKeysList_Destroy(&l);

// If option need to be specified:

kvWatchOptions o;

kvWatchOptions_Init(&o);
o.Timeout = 5000; // 5 seconds.
s = kvStore_Keys(&l, kv, &o);

Заголовки

Заголовки доступны при подключении к серверам версии 2.2.0+.

Они очень напоминают заголовки http. Они представляют собой карту пар ключ/значение, где значение представляет собой массив строк.

Заголовки позволяют пользователям добавлять метаинформацию о сообщении, не мешая полезной нагрузке сообщения.

Обратите внимание, что если приложение попытается отправить сообщение с заголовком при подключении к серверу, который их не понимает, вызов публикации вернет ошибку NATS_NO_SERVER_SUPPORT.

Существует API, чтобы узнать, поддерживает ли сервер, к которому в данный момент подключено, заголовки:

natsStatus s = natsConnection_HasHeaderSupport(conn);
if (s == NATS_NO_SERVER_SUPPORT)
    // deal with server not supporting this feature.

Если сервер понимает заголовки, но собирается доставить сообщение клиенту, который этого не понимает, заголовки удаляются, чтобы более старые клиенты могли по-прежнему получать сообщение. Важно, чтобы все клиентские и серверные версии поддерживали заголовки, если приложения полагаются на заголовки.

Для получения более подробной информации об API заголовков, пожалуйста, получите пример: examples/getstarted/headers.c.

Подписки с подстановочными знаками

В *подстановочный знак соответствует любому токену на любом уровне субъекта:

natsConnection_Subscribe(&sub, nc, "foo.*.baz", onMsg, NULL);

Этот подписчик будет получать сообщения, отправленные на:

  • foo.bar.baz
  • foo.a.baz
  • и т.д...

Однако он не будет получать сообщения о:

  • foo.baz
  • foo.baz.bar
  • и т.д...

В >подстановочный знак соответствует любой длине отказа субъекта и может быть только последним токеном.

natsConnection_Subscribe(&sub, nc, "foo.>", onMsg, NULL);

Этот подписчик получит любое сообщение, отправленное на:

  • foo.bar
  • foo.bar.baz
  • foo.foo.bar.bax.22
  • и т.д...

Однако он не будет получать сообщения, отправленные на:

  • фу
  • bar.foo.baz
  • и т.д...

Публикация на эту тему приведет к тому, что два вышеуказанных подписчика получат сообщение:

natsConnection_PublishString(nc, "foo.bar.baz", "got it?");

Группы очередей

Все подписки с одинаковым именем очереди образуют группу очереди. Каждое сообщение будет доставлено только одному подписчику на группу очереди с использованием сематики очереди. Вы можете иметь столько групп очередей, сколько пожелаете. Обычные подписчики продолжат работать как положено.

natsConnection_QueueSubscribe(&sub, nc, "foo", "job_workers", onMsg, NULL);

TLS

(Обратите внимание, что библиотека должна быть собрана с поддержкой TLS, которая включена по умолчанию, чтобы эти API работали. Дополнительные сведения см. в главе «Сборка» о сборке с TLS или без него).

Соединение SSL/TLS настраивается с помощью natsOptions. В зависимости от уровня безопасности, который вы хотите, это может быть так же просто, как установить для безопасного логического значения значение true на natsOptions_SetSecure()вызов.

Даже при полной безопасности (клиент проверяет сертификат сервера, а серверу требуются клиентские сертификаты) установка включает всего несколько вызовов.

// Here is the minimum to create a TLS/SSL connection:

// Create an options object.
natsOptions_Create(&opts);

// Set the secure flag.
natsOptions_SetSecure(opts, true);

// You may not need this, but suppose that the server certificate
// is self-signed and you would normally provide the root CA, but
// don't want to. You can disable the server certificate verification
// like this:
natsOptions_SkipServerVerification(opts, true);

// Connect now...
natsConnection_Connect(&nc, opts);

// That's it! On success you will have a secure connection with the server!

(...)

// This example shows what it takes to have a full SSL configuration,
// including server expected's hostname, root CA, client certificates
// and specific ciphers to use.

// Create an options object.
natsOptions_Create(&opts);

// Set the secure flag.
natsOptions_SetSecure(opts, true);

// For a server with a trusted chain built into the client host,
// simply designate the server name that is expected. Without this
// call, the server certificate is still verified, but not the
// hostname.
natsOptions_SetExpectedHostname(opts, "localhost");

// Instead, if you are using a self-signed cert and need to load in the CA.
natsOptions_LoadCATrustedCertificates(opts, caCertFileName);

// If the server requires client certificates, provide them along with the
// private key, all in one call.
natsOptions_LoadCertificatesChain(opts, certChainFileName, privateKeyFileName);

// You can also specify preferred ciphers if you want.
natsOptions_SetCiphers(opts, "-ALL:HIGH");

// Then simply pass the options object to the connect call:
natsConnection_Connect(&nc, opts);

// That's it! On success you will have a secure connection with the server!

Новая аутентификация (Nkeys и учетные данные пользователя)

Для этого требуется сервер с версией >= 2.0.0

Серверы NATS имеют новый механизм безопасности и аутентификации для аутентификации с использованием учетных данных пользователя и Nkeys. Самая простая форма - использовать вспомогательную опцию natsOptions_SetUserCredentialsFromFiles().

// Retrieve both user JWT and NKey seed from single file `user.creds`.
s = natsOptions_SetUserCredentialsFromFiles(opts, "user.creds", NULL);
if (s == NATS_OK)
    s = natsConnection_Connect(&nc, opts);

С этой опцией библиотека будет загружать пользовательский JWT и начальное число NKey из одного файла. Обратите внимание, что библиотека стирает буферы, используемые для чтения файлов.

Если вы предпочитаете хранить JWT и начальное число в двух разных файлах, используйте вместо этого эту форму:

// Retrieve the user JWT from the file `user.jwt` and the seed from the file `user.nk`.
s = natsOptions_SetUserCredentialsFromFiles(opts, "user.jwt", "user.nk");
if (s == NATS_OK)
    s = natsConnection_Connect(&nc, opts);

Вы также можете установить обработчики обратного вызова и напрямую управлять подписью вызова.

/*
 * myUserJWTCb is a callback that is supposed to return the user JWT.
 * An optional closure can be specified.
 * mySignatureCB is a callback that is presented with a nonce and is
 * responsible for returning the signature for this nonce.
 * An optional closure can be specified.
 */
s = natsOptions_SetUserCredentialsCallbacks(opts, myUserJWTCb, NULL, mySignatureCb, NULL);
if (s == NATS_OK)
    s = natsConnection_Connect(&nc, opts);

Для аутентификации NKey можно указать общедоступный NKey и файл, содержащий соответствующее начальное число NKey. При подключении библиотека загрузит этот файл, чтобы найти начальное число NKey и использовать его для подписи одноразового номера, отправленного сервером. Библиотека позаботится об очистке памяти, в которую скопировано семя, как только поскольку одноразовый номер подписан.

s = natsOptions_SetNKeyFromSeed(opts, "UDXU4RCSJNZOIQHZNWXHXORDPRTGNJAHAHFRGZNEEJCPQTT2M7NLCNF4", "seed.nk");
if (s == NATS_OK)
    s = natsConnection_Connect(&nc, opts);

Файл «seed.nk» содержит начальное число NKey (закрытый ключ). Вот пример:

$ more seed.nk
SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY

Наконец, можно указать общедоступный ключ NKey и обратный вызов подписи. Открытый ключ будет отправлен на сервер, а предоставленный обратный вызов отвечает за подписание одноразового номера сервера. Когда сервер получает подписанный одноразовый номер, он может проверить, правильно ли он подписан, используя предоставленный открытый ключ.

/*
 * myPublicKey is the user's public key, which will be sent to the server.
 * mySignatureCB is a callback that is presented with a nonce and is
 * responsible for returning the signature for this nonce.
 * An optional closure can be specified.
 */
s = natsOptions_SetNKey(opts, myPublicKey, mySignatureCb, NULL);
if (s == NATS_OK)
    s = natsConnection_Connect(&nc, opts);

Обратный вызов подписи может использовать любую криптографическую библиотеку для подписи одноразового номера, а также nats_Sign()функция.

natsStatus
mySignatureCb(
    char            **customErrTxt,
    unsigned char   **signature,
    int             *signatureLength,
    const char      *nonce,
    void            *closure)
{
    // This approach requires to provide the seed (private key).
    // Hardcoding it in the application (like in this example) may not be really safe.
    return nats_Sign(
        "SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY",
        nonce, signature, signatureLength);
}

Вы можете подписать любой контент и получить подпись взамен. Соединение должно быть создано с natsOptions_SetUserCredentialsFromFiles()вариант, чтобы это работало.

    s = natsOptions_Create(&opts);
    if (s == NATS_OK)
        s = natsOptions_SetUserCredentialsFromFiles(opts, "user.creds", NULL);
    if (s == NATS_OK)
        s = natsConnection_Connect(&nc, opts);

    // Sign some arbitrary content
    const unsigned char *content   = (const unsigned char*) "hello";
    int                 contentLen = 5;
    unsigned char       sig[64];

    s = natsConnection_Sign(nc, content, contentLen, sig);
    if (s == NATS_OK)
    {
        // Do something with signature...
    }

Расширенное использование

Сброс соединения гарантирует, что любые буферизованные данные будут сброшены (отправлены) на сервер NATS.

// Flush connection to server, returns when all messages have been processed.
natsConnection_Flush(nc);
printf("All clear!\n");

// Same as above but with a timeout value, expressed in milliseconds.
s = natsConnection_FlushTimeout(nc, 1000);
if (s == NATS_OK)
    printf("All clear!\n");
else if (s == NATS_TIMEOUT)
    printf("Flushed timed out!\n");
else
    printf("Error during flush: %d - %s\n", s, natsStatus_GetText(s));

Автоматическая отмена подписки позволяет автоматически удалять подписку, когда подписчик получил заданное количество сообщений. Это используется внутри natsConnection_Request()вызов.

// Auto-unsubscribe after 100 messages received
natsConnection_Subscribe(&sub, nc, "foo", onMsg, NULL);
natsSubscription_AutoUnsubscribe(sub, 100);

Подписки можно слить. Это гарантирует, что интерес будет удален с сервера, но все сообщения, которые были помещены во внутреннюю очередь, обрабатываются.

// This call does not block.
natsSubscription_Drain(sub);

// If you want to wait for the drain to complete, call this
// and specify a timeout. Zero or negative to wait for ever.
natsSubscription_WaitForDrainCompletion(sub, 0);

Соединения можно слить. Этот процесс сначала переведет все зарегистрированные подписки в режим слива и предотвратит создание любой новой подписки. Когда все подписки истощаются, вызовы публикации истощаются (посредством сброса соединения), и новые вызовы публикации в этот момент завершатся сбоем. Затем соединение закрывается.

// Use default timeout of 30 seconds.
// But this call does not block. Use natsOptions_SetClosedCB() to be notified
// that the connection is closed.
natsConnection_Drain(nc);

// To specify a timeout for the operation to complete, after which the connection
// is forcefully closed. Here is an exampel of a timeout of 10 seconds (10,000 ms).
natsConnection_DrainTimeout(nc, 10000);

Вы можете иметь несколько соединений в своем приложении, смешивая подписчиков и издателей.

// Create a connection 'nc1' to host1
natsConnection_ConnectTo(&nc1, "nats://host1:4222");

// Create a connection 'nc2' to host2
natsConnection_ConnectTo(&nc2, "nats://host2:4222");

// Create a subscription on 'foo' from connection 'nc1'
natsConnection_Subscribe(&sub, nc1, "foo", onMsg, NULL);

// Uses connection 'nc2' to publish a message on subject 'foo'. The subscriber
// created previously will receive it through connection 'nc1'.
natsConnection_PublishString(nc2, "foo", "hello");

Использование natsOptionsпозволяет указать параметры, используемые natsConnection_Connect()вызов. Обратите внимание, что natsOptionsобъект, который передается этому вызову, клонируется, что означает, что любые изменения, внесенные в объект параметров, не окажут никакого влияния на подключенное соединение.

natsOptions *opts = NULL;

// Create an options object
natsOptions_Create(&opts);

// Set some properties, starting with the URL to connect to:
natsOptions_SetURL(opts, "nats://host1:4222");

// Set a callback for asynchronous errors. This is useful when having an asynchronous
// subscriber, which would otherwise have no other way of reporting an error.
natsOptions_SetErrorHandler(opts, asyncCb, NULL);

// Connect using those options:
natsConnection_Connect(&nc, opts);

// Destroy the options object to free memory. The object was cloned by the connection,
// so the options can be safely destroyed.
natsOptions_Destroy(opts);

Как мы видели, все обратные вызовы имеют void *closureпараметр. Это полезно, когда обратному вызову нужно выполнить какую-то работу и нужна ссылка на какой-то объект. При настройке обратного вызова вы можете указать указатель на этот объект.

// Our object definition
typedef struct __Errors
{
    int count;

} Errors;

(...)

int
main(int argc, char **argv)
{
    // Declare an 'Errors' object on the stack.
    Errors asyncErrors;

    // Initialize this object
    memset(&asyncErrors, 0, sizeof(asyncErrors);

    // Create a natsOptions object.
    (...)

    // Set the error callback, and pass the address of our Errors object.
    natsOptions_SetErrorHandler(opts, asyncCb, (void*) &asyncErrors);

    // Create the connection and subscriber.
    (...)

    // Say that we are done subscribing, we could check the number of errors:
    if (asyncErrors.count > 1000)
    {
        printf("That's a lot of errors!\n");
    }

    (...)
}

Обратный вызов будет использовать замыкание следующим образом:

static void
asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
{
    Errors *errors = (Errors*) closure;

    printf("Async error: %d - %s\n", err, natsStatus_GetText(err));

    errors->count++;
}

Это то же самое для всех других обратных вызовов, используемых в библиотеке C NATS.

Библиотека может автоматически переподключаться к серверу NATS, если соединение разрывается. Однако само начальное соединение завершится ошибкой, если сервер недоступен в данный момент. время подключения. Была добавлена ​​опция, чтобы соединение вело себя как повторное подключение, используя попытки повторного подключения и ожидание:

    s = natsOptions_SetMaxReconnect(opts, 5);
    if (s == NATS_OK)
        s = natsOptions_SetReconnectWait(opts, 1000);

    // Instruct the library to block the connect call for that
    // long until it can get a connection or fails.
    if (s == NATS_OK)
        s = natsOptions_SetRetryOnFailedConnect(opts, true, NULL, NULL);

    // If the server is not running, this will block for about 5 seconds.
    s = natsConnection_Connect(&conn, opts);

Вы можете сделать соединение асинхронным (если оно не может подключиться немедленно) с помощью передача обработчика соединения:

    s = natsOptions_SetMaxReconnect(opts, 5);
    if (s == NATS_OK)
        s = natsOptions_SetReconnectWait(opts, 1000);
    if (s == NATS_OK)
        s = natsOptions_SetRetryOnFailedConnect(opts, true, connectedCB, NULL);

    // Start the connect. If no server is running, it should return NATS_NOT_YET_CONNECTED.
    s = natsConnection_Connect(&conn, opts);
    printf("natsConnection_Connect call returned: %s\n", natsStatus_GetText(s));

    // Connection can be used to create subscription and publish messages (as
    // long as the reconnect buffer is not full).

Проверьте пример examples/connect.cдля большего количества вариантов использования.

Вы также можете указать крайний срок записи, что означает, что когда библиотека пытается отправлять байты на сервер NATS, если соединение неработоспособно, но о нем не сообщалось как закрытые, вызовы завершатся с ошибкой NATS_TIMEOUTошибка. Сокет будет закрыт и библиотека попытается повторно подключиться (если не отключена). Обратите внимание, что это может также происходит в том случае, если сервер потребляет недостаточно быстро.

    // Sets a write deadline of 2 seconds (value is in milliseconds).
    s = natsOptions_SetWriteDeadline(opts, 2000);

Кластерное использование

static char *servers[] = { "nats://localhost:1222",
                           "nats://localhost:1223",
                           "nats://localhost:1224"};

// Setup options to include all servers in the cluster.
// We first created an options object, and pass the list of servers, specifying
// the number of servers on that list.
natsOptions_SetServers(opts, servers, 3);

// We could also set the amount to sleep between each reconnect attempt (expressed in
// milliseconds), and the number of reconnect attempts.
natsOptions_SetMaxReconnect(opts, 5);
natsOptions_SetReconnectWait(opts, 2000);

// We could also disable the randomization of the server pool
natsOptions_SetNoRandomize(opts, true);

// Setup a callback to be notified on disconnects...
natsOptions_SetDisconnectedCB(opts, disconnectedCb, NULL);

// And on reconncet
natsOptions_SetReconnectedCB(opts, reconnectedCb, NULL);

// This callback could be used to see who we are connected to on reconnect
static void
reconnectedCb(natsConnection *nc, void *closure)
{
    // Define a buffer to receive the url
    char buffer[64];

    buffer[0] = '\0';

    natsConnection_GetConnectedUrl(nc, buffer, sizeof(buffer));
    printf("Got reconnected to: %s\n", buffer);
}

Использование библиотеки циклов событий

Для каждого соединения NATSбиблиотека создает поток, читающий данные из сокета. Публикация данных приводит к добавлению данных в буфер, который «сбрасывается» из обратного вызова таймера или остается на месте, когда буфер достигает определенного размера. Сброс означает, что мы пишем в сокет (и сокет находится в режиме блокировки).

Если в вашем процессе запущено несколько соединений, количество потоков увеличится (поскольку каждое соединение использует поток для получения данных из сокета). Если это становится проблемой или если вы уже используете библиотеку уведомлений о событиях, вы можете указать NATSlibrary, чтобы использовать эту библиотеку событий вместо использования потока для чтения и прямой записи в сокет при публикации данных.

Это работает путем установки цикла событий и различных обратных вызовов через natsOptions_SetEventLoop()API. В зависимости от используемого вами цикла событий у вас есть дополнительные вызовы API. API находится в adaptersкаталог и задокументирован.

Мы предоставляем адаптеры для двух библиотек уведомлений о событиях: libevent и libuv .

// Create an event loop object
uv_loop_t *uvLoop = uv_default_loop();

// Set it into an options object
natsOptions_SetEventLoop(opts,
                         (void*) uvLoop,
                         natsLibuv_Attach,
                         natsLibuv_Read,
                         natsLibuv_Write,
                         natsLibuv_Detach);

// Connect (as usual)
natsConnection_Connect(&conn, opts);

// Subscribe (as usual)
natsConnection_Subscribe(&sub, conn, subj, onMsg, NULL);

// Run the event loop
uv_run(uvLoop, UV_RUN_DEFAULT);

Обратный вызов onMsgкоторые вы зарегистрировали, будут активированы, как обычно, когда данные станут доступны.

Сложности возникают при публикации данных. Действительно, публикация — это просто помещение данных в буфер, и именно библиотека событий будет уведомлять об обратном вызове о необходимости выполнения записи в сокет. Для этого цикл событий должен быть «работающим».

Поэтому, если вы публикуете из потока, в котором выполняется цикл событий, вам нужно «запускать» цикл после каждого (или нескольких) вызовов публикации, чтобы данные действительно были отправлены. В качестве альтернативы вы можете опубликовать из потока, отличного от потока, выполняющего цикл обработки событий.

Вышеупомянутое важно иметь в виду в отношении вызовов, которые выполняют запрос-ответ. Они не должны создаваться из потока, выполняющего цикл обработки событий. Вот пример таких звонков:

natsConnection_Request()
natsConnection_Flush()
natsConnection_FlushTimeout()
...

Действительно, поскольку эти вызовы публикуют данные и ждут «ответа», если вы выполняете их в потоке цикла событий (или пока цикл не «выполняется»), данные не будут отправлены. Звонки не получат ответ и тайм-аут.

За natsConnection_Request(), использовать natsConnection_PublishRequest()вместо этого и зарегистрировать подписчика на ответ.

Для других должна быть доступна асинхронная версия этих вызовов.

Смотрите примеры в examplesкаталог для полного использования.

часто задаваемые вопросы

Вот некоторые из часто задаваемых вопросов:

С чего начать?

Существует несколько ресурсов, которые помогут вам начать работу с клиентской библиотекой NATS C.
То examples/getstartedкаталог содержит очень простые программы, которые используют библиотека для простых функций, таких как отправка сообщения или настройка подписки.
То examplesсам каталог содержит более подробные примеры, включающие ошибку обработка и более продвинутые API. Вы также найдете примеры, показывающие использование клиентской библиотеки NATS C и циклов внешних событий.

Как насчет поддержки платформы XYZ?

Мы поддерживаем платформы, доступные нам для разработки и тестирования. В настоящее время это ограничено Linux, macOS и Windows. Даже тогда могут быть версии ОС, которые у вас могут быть проблемы со сборкой, и мы с радостью примем PR, чтобы исправить процесс сборки, если он не ломает тех, кого мы поддерживаем!

Как мне построить?

Мы используем cmake, поскольку он позволяет создавать кроссплатформенные сборки. Это работает для нас. Вы можете свободно создайте свой собственный make-файл или решение для Windows. Если вы хотите использовать cmake, следуйте этим инструкции .

Я нашел ошибку в вашей библиотеке, что мне делать?

Пожалуйста, сообщите о проблеме здесь . Дайте нам столько как возможную информацию о том, как вы можете воспроизвести это. Если у вас есть исправление для этого, вы можете также открыть PR.

Является ли библиотека потокобезопасной?

Все вызовы используют внутреннюю блокировку там, где это необходимо. Как пользователь, вам нужно будет сделать свою собственную блокировку если бы вы использовали один и тот же обратный вызов для разных подписчиков (поскольку обратный вызов вызываться из разных потоков для каждого подписчика).
Обратите внимание, что это верно для любого типа обратного вызова, существующего в библиотеке NATS C, например соединение или обработчики ошибок и т. д.. если вы укажете тот же обратный вызов, вы рискуете, что код в этом обратном вызове может выполняться из разных внутренних потоков.

Какова поточная модель библиотеки?

Библиотека использует некоторые потоки для обработки внутренних таймеров или отправки асинхронных ошибок. например. Вот список потоков, созданных в результате создания пользователем NATS. объекты:

  • Каждое соединение имеет поток для чтения данных из сокета. Если вы используете внешний цикл событий, этот поток не создается.

  • У каждого соединения есть поток, отвечающий за очистку исходящего буфера. Если вы создадите связь с natsOptions_SetSendAsap()параметр, этот поток не создается, так как любые исходящие данные сбрасываются сразу.

  • Каждая асинхронная подписка имеет поток, используемый для отправки сообщений обратному вызову пользователя. Если вы используете nats_SetMessageDeliveryPoolSize(), глобальный пул потоков (из size, указанный в качестве параметра этой функции), используется вместо потока для каждой асинхронной подписки.

Как отправить двоичные данные?

NATS — это текстовый протокол, в котором полезная нагрузка сообщения представляет собой массив байтов. Сервер никогда не интерпретирует содержание сообщения.

API natsConnection_Publish() принимает указатель на память, а пользователь указывает, сколько байты из этого места должны быть отправлены. natsConnection_PublishString() добавлен для удобство, если вы хотите отправить строки, но на самом деле это эквивалентно вызову natsConnection_Publish() с strlenна количество байт.

Данные отправляются на месте или из другого потока?

Из соображений пропускной способности (и для имитации клиента Go, на котором основана эта библиотека), клиентская библиотека использует буфер для всех записей сокетов. Этот буфер очищается в потоке, в котором происходит публикация. если буфер заполнен. Размер буфера можно настроить с помощью natsOptions_SetIOBufSize(). Вы можете запросить, как много данных находится в этом буфере с помощью функции natsConnection_Buffered().

Когда вызов публикации не заполняет буфер, вызов возвращается без фактически отправленных данных. на сервер. Выделенный поток (поток очистки) автоматически очищает этот буфер. Этот помогает с пропускной способностью, поскольку количество системных вызовов уменьшается, а количество байтов отправлено сразу, выше, однако это может увеличить задержку для ситуаций запроса/ответа, когда один хочет отправить одно сообщение за раз. Для этого вызов natsConnection_Request() сбрасывает буфер на месте и не зависит от потока флешера.

Опцию natsOptions_SetSendAsap() можно использовать для принудительной отправки всех вызовов публикации из соединения. создан с этой опцией, чтобы сбрасывать буфер сокета при каждом вызове и не добавлять задержку, полагаясь на промывочной резьбе.

Вызов публикации не вернул ошибку, гарантированно ли сообщение будет отправлено подписчику?

Нет! Даже не гарантируется, что сервер получил это сообщение. Как описано выше, сообщение может просто находиться в буфере соединения, даже если каждый вызов публикации очищает буфер сокета, после этого вызов возвращается. Нет обратной связи от сервера, что он действительно обработал это сообщение. Сервер мог рухнуть после чтения из сокета.

Независимо от того, получил сервер сообщение или нет, между публикация и подписка. Если издателю нужно знать, что его сообщение было получено и обрабатывается подписавшимся приложением, следует использовать шаблон запроса/ответа. Проверять natsConnection_Request() и natsConnection_PublishRequest() для более подробной информации.

Нужно ли везде вызывать natsConnection_Flush()?

Эта функция не просто очищает буфер сокета, а отправляет PINGпротокол сообщение на сервер и получает PONGобратно синхронно.

Как описано ранее, если вы хотите сбросить буфер сокета, чтобы уменьшить задержку во всех публиковать звонки, вы должны создать соединение с опцией «отправить как можно скорее».

API natsConnection_Flush() часто используется для проверки того, что сервер обработал один из протокольные сообщения.

Например, создание подписки является асинхронным. Когда вызов вернется, вы можете получите сообщение об ошибке, если соединение было ранее закрыто, но вы не получите сообщение об ошибке, если ваш Например, у пользователя подключения нет разрешения на создание этой подписки.

Вместо этого сервер отправляет сообщение об ошибке, которое асинхронно принимается клиентской библиотекой. Вызов natsConnection_Flush() для того же соединения, которое создало подписку, гарантирует что сервер обработал подписку и, если произошла ошибка, отправил эту ошибку обратно перед PONG. Затем можно проверить natsConnection_GetLastError(). чтобы выяснить, была ли подписка успешно зарегистрирована или нет.

Как данные и протоколы поступают с сервера?

При создании соединения создается библиотечный поток для чтения протоколов (включая сообщения) из розетки. Вам не нужно ничего делать в этом отношении. Когда данные считываются из сокета он будет преобразован в протоколы или сообщения и распределен по соответствующим обратным вызовам.

Многие вещи асинхронны, как я узнаю, есть ли ошибка?

Вы должны настроить обратные вызовы ошибок, чтобы получать уведомления при возникновении асинхронных ошибок. Это может быть устанавливается с помощью natsOptions. Например, проверьте natsOptions_SetErrorHandler(). если ты зарегистрировать обратный вызов ошибки для соединения, в случае возникновения ошибки ваш зарегистрированный обработчик ошибок будет вызываться.

Отправляются ли сообщения из асинхронной подписки параллельно?

При создании асинхронной подписки библиотека создаст поток, отвечающий за для отправки сообщений для этой подписки. Сообщения для данной подписки отправляются последовательно. То есть ваш обратный вызов вызывается с одним сообщением за раз и только после обратный вызов возвращает, что библиотека вызывает его снова со следующим ожидающим сообщением.

Таким образом, для каждого соединения существует поток, сбрасывающий данные из сокета, а затем передаются сообщения. в соответствующий поток подписки, который затем отвечает за их отправку.

Если вы планируете иметь много подписок и уменьшить количество потоков, используемых библиотекой, вам следует рассмотреть возможность использования пула потоков на уровне библиотеки, который позаботится об отправке сообщений. См. nats_SetMessageDeliveryPoolSize(). Подписка будет использовать пул библиотек, если соединение, из которого оно было создано, имело для параметра natsOptions_UseGlobalMessageDelivery() значение true.

Даже при использовании глобального пула сообщения из данной подписки всегда отправляются последовательно и из одного потока.

Что такое ошибка SLOW CONSUMER, которую я вижу на сервере?

Сервер при отправке данных клиентскому (или маршрутному) соединению устанавливает крайний срок записи. То есть, если сокет блокирует запись на это время, он просто закроет соединение.

Эта ошибка возникает, когда клиент (или другой сервер) недостаточно быстро считывает данные с разъем. Как мы видели ранее, клиентское соединение создает поток, задачей которого является чтение данные из сокета, анализировать их и перемещать протокол или сообщения в соответствующие обработчики.

Так что на самом деле это не является признаком того, что обработчик сообщений обрабатывает сообщения слишком медленно, вместо этого это, вероятно, результат проблем с ресурсами (недостаточно циклов ЦП для чтения из сокета или недостаточно быстрое чтение) или конфликт внутренних блокировок, препятствующий потоку чтение из сокета для чтения данных достаточно быстро, потому что оно заблокировано при попытке получить некоторые замок.

В чем тогда ошибка SLOW CONSUMER в клиенте?

Эта ошибка, в отличие от ошибки, сообщаемой сервером, связана с отправкой сообщений обратному вызову пользователя. Как объяснялось, сообщения перемещаются из чтение потока из сокета в поток подписки, ответственный за диспетчеризацию.

Внутренняя очередь подписки ограниченного размера по умолчанию. Когда нить соединения не может добавить сообщение в эту очередь, потому что она заполнена, сообщение будет удалено, и если установлен обработчик ошибок, там будет размещено сообщение.

Например, наличие обработчика сообщений, который требует слишком много времени для обработки сообщения, может вызвать ошибку медленного клиента-потребителя, если скорость входящих сообщений высока и/или лимиты ожидания подписки недостаточно велики.

API natsSubscription_SetPendingLimits() можно использовать для установки внутреннего ограничения подписки. лимиты очереди. значения -1для количества и/или размера означает, что соответствующая метрика будет не быть проверенным. Установка обоих на -1означает, что клиентская библиотека будет ставить в очередь все входящие сообщений, независимо от того, с какой скоростью они отправляются, что может привести к тому, что ваше приложение использовать много памяти.

Что произойдет, если сервер перезагрузится или произойдет дисконнект?

По умолчанию библиотека попытается переподключиться. Параметры повторного подключения могут быть установлены либо полностью отключить логику повторного подключения или установить количество попыток и задержку между попытками. См. natsOptions_SetAllowReconnect(), natsOptions_SetMaxReconnect() и т. д. для получения дополнительной информации.

Если вы не установили для natsOptions_SetNoRadomize() значение true, то список заданных URL-адресов рандомизируется. при создании соединения. Когда происходит отключение, пробуется следующий URL-адрес. Если это не удастся, библиотека переходит к следующему. Когда все испробовано, цикл перезапускается с первого до все они были опробованы, максимальное количество попыток повторного подключения определено с помощью параметров. Библиотека будет приостанавливаться на время, указанное в параметрах, при попытке переподключиться к серверу, на котором он был ранее подключался к.

Если вы подключаетесь к серверу, на котором включена реклама URL-адреса подключения (по умолчанию для серверов 0.9.2+), когда серверы добавляются в кластер серверов NATS, сервер будет отправлять URL-адреса этого новый сервер своим клиентам. Это увеличивает «пул» или URL-адреса, которые могли быть связаны с соединением. создается с помощью и позволяет библиотеке повторно подключаться к серверам, которые были добавлены в кластер.

Пока библиотека отключена от сервера, все вызовы публикации или новой подписки буферизуется в памяти. Этот буфер имеет размер по умолчанию, но его можно настроить. Когда этот буфер полная, дальнейшая публикация или новые вызовы подписки будут сообщать об ошибках.
Когда клиентская библиотека повторно подключится к серверу, ожидающий буфер будет быть сброшены на сервер.

Если вашему приложению необходимо знать, произошло ли отключение или повторное подключение библиотеки, вы должны снова установить некоторые обратные вызовы, чтобы получать уведомления о таких событиях. Используйте natsOptions_SetDisconnectedCB(), natsOptions_SetReconnectedCB() и natsOptions_SetClosedCB(). Обратите внимание, что последний является окончательным событие для соединения. Когда вызывается этот обратный вызов, соединение становится недействительным, т.е. то есть вы больше не будете получать или отправлять данные через это подключение.

Нужно ли освобождать объекты NATS?

Все объекты, которые вы создаете и которые имеют _Destroy()API действительно должен быть уничтожен если вы хотите не пропускать память. Один из важных и часто пропускаемых natsMsg_Destroy()тот должен быть вызван в обработчике сообщения после того, как вы закончите обработку сообщения. сообщение было создано библиотекой и передано вам в обработчик сообщений, но вы ответственный за звонок natsMsg_Destroy().

Что делает nats_ReleaseThreadMemory()?

Библиотека NATS C может хранить объекты, используя хранилище локальных потоков. Потоки, которые создаются и однако, обрабатываемые библиотекой, автоматически очищаются, если пользователь создает поток и вызывает некоторые API NATS, есть вероятность, что библиотека что-то сохранила в этом локальное хранилище потока. Когда поток завершается, пользователь должен вызвать эту функцию, чтобы библиотека может уничтожить объекты, которые она могла хранить.

Клиент потоковой передачи NATS

Потоковое основное использование

Обратите внимание, что проверка ошибок игнорируется для ясности. Проверить examples/stanкаталог для полного использования.

// Connect without options
stanConnection_Connect(&sc, cluster, clientID, NULL);

// Simple Synchronous Publisher.
// This does not return until an ack has been received from NATS Streaming
stanConnection_Publish(sc, "foo", (const void*) "hello", 5);

// Simple Async Subscriber
stanConnection_PublishAsync(sc, "foo", (const void*) "hello", 5, _pubAckHandler, NULL);

// Simple Subscription without options (default to non durable, receive new only)
stanConnection_Subscribe(&sub, sc, "foo", onMsg, NULL, NULL);

// Unsubscribe (note that for non durable subscriptions, Unsubscribe() and Close() are the same
stanSubscription_Unsubscribe(sub);

// Close connection
stanConnection_Close(sc);

Потоковые подписки

Подписки NATS Streaming аналогичны подпискам NATS, но клиенты могут начинать подписку в более ранней точке потока сообщений, что позволяет им получать сообщения, которые были опубликованы до того, как этот клиент зарегистрировал интерес.

Варианты описаны с примерами ниже:

// Create a Subscription Options:
stanSubOptions *subOpts = NULL;

stanSubOptions_Create(&subOpts);

// Subscribe starting with most recently published value
stanSubOptions_StartWithLastReceived(subOpts);

// OR: Receive all stored messages
stanSubOptions_DeliverAllAvailable(subOpts);

// OR: Receive messages starting at a specific sequence number
stanSubOptions_StartAtSequence(subOpts, 22);

// OR: Start at messages that were stored 30 seconds ago. Value is expressed in milliseconds.
stanSubOptions_StartAtTimeDelta(subOpts, 30000);

// Create the subscription with options
stanConnection_Subscribe(&sub, sc, "foo", onMsg, NULL, subOpts);

Потоковые долговечные подписки

Воспроизведение сообщений обеспечивает большую гибкость для клиентов, желающих начать обработку в какой-то более ранней точке потока данных. Однако некоторым клиентам просто нужно продолжить с того места, где они остановились в предыдущем сеансе, без необходимости вручную отслеживать свое положение в потоке сообщений. Долгосрочные подписки позволяют клиентам назначать долговременное имя подписке при ее создании. Это приводит к тому, что сервер потоковой передачи NATS отслеживает последнее подтвержденное сообщение для этого идентификатора клиента + надежное имя, так что клиенту будут доставлены только сообщения, начиная с последнего подтвержденного сообщения.

stanConnection_Connect(&sc, "test-cluster", "client-123", NULL);

// Create subscription options
stanSubOptions_Create(&subOpts);

// Set a durable name
stanSubOptions_SetDurableName(subOpts, "my-durable");

// Subscribe
stanConnection_Subscribe(&sub, sc, "foo", onMsg, NULL, subOpts);
...
// client receives message sequence 1-40
...
// client disconnects for an hour
...
// client reconnects with same clientID "client-123"
stanConnection_Connect(&sc, "test-cluster", "client-123", NULL);

// client re-subscribes to "foo" with same durable name "my-durable"
stanSubOptions_Create(&subOpts);
stanSubOptions_SetDurableName(subOpts, "my-durable");
stanConnection_Subscribe(&sub, sc, "foo", onMsg, NULL, subOpts);
...
// client receives messages 41-current

Группы потоковой очереди

Все подписки с одинаковым именем очереди (независимо от соединения они происходят из) образуют группу очереди. Каждое сообщение будет доставлено только одному подписчику на группу очереди, используя семантику очередей. Вы можете иметь столько групп очередей, сколько пожелаете.

Обычные подписчики продолжат работать как положено.

Создание группы очереди

Группа очереди создается автоматически, когда первый подписчик очереди созданный. Если группа уже существует, участник добавляется в группу.

stanConnection_Connect(&sc, "test-cluster", "clientid", NULL);

// Create a queue subscriber on "foo" for group "bar"
stanConnection_QueueSubscribe(&qsub1, "foo", "bar", onMsg, NULL, NULL);

// Add a second member
stanConnection_QueueSubscribe(&qsub2, "foo", "bar", onMsg, NULL, NULL);

// Notice that you can have a regular subscriber on that subject
stanConnection_Subscribe(&sub, sc, "foo", onMsg, NULL, NULL);

// A message on "foo" will be received by sub and qsub1 or qsub2.

Начальная позиция

Обратите внимание, что после формирования группы очереди начальная позиция участника игнорируется. при добавлении в группу. Он начнет получать сообщения с последнего положение в группе.

Предположим, что канал fooсуществует и есть 500сохраненные сообщения, группа barуже создан, есть два члена и последний последовательность отправленных сообщений 100. Добавляется новый участник. Обратите внимание на его начальное положение:

stanSubOptions_Create(&subOpts);
stanSubOptions_StartAtSequence(subOpts, 200);

stanConnection_QueueSubscribe(&qsub, "foo", "bar", onMsg, NULL, subOpts);

Это не приведет к ошибке, но начальная позиция будет проигнорирована. Предполагая этот участник будет тем, кто получит следующее сообщение, он получит сообщение последовательность 101.

Выход из группы

Выход из группы возможен двумя способами: закрыть соединение абонента или вызов Unsubscribe:

// Have qsub leave the queue group
stanSubscription_Unsubscribe(qsub);

Если у покидающего участника были неподтвержденные сообщения, эти сообщения переназначаются остальным членам.

Закрытие группы очереди

Специального API для этого нет. После того, как все участники уйдут (либо позвонив Unsubscribe, или их соединения закрыты), группа удаляется с сервера.

Следующий звонок в QueueSubscribeс тем же именем группы создаст новую группу, то есть начальная позиция вступит в силу, и доставка начнется оттуда.

Потоковые группы устойчивых очередей

Как описано выше, для непостоянных подписчиков очереди, когда последний член покидает группу, эта группа удалена. Надежная группа очереди позволяет оставить всех участников, но при этом поддерживать состояние. Когда участник повторно присоединяется, он начинает с последней позиции в этой группе.

Создание группы устойчивых очередей

Постоянная группа очередей создается так же, как и стандартная группа очередей. кроме DurableNameОпция должна использоваться для указания долговечности.

stanSubOptions_Create(&subOpts);
stanSubOptions_SetDurableName(subOpts, "dur");

stanConnection_QueueSubscribe(&qsub, "foo", "bar", onMsg, NULL, subOpts);

Группа под названием dur:bar(объединение долговременного имени и имени группы) создается в сервер. Это означает две вещи:

  • Персонаж :не допускается для долговременного имени подписчика очереди.
  • Постоянные и временные группы очередей с одинаковыми именами могут сосуществовать.
// Non durable queue subscriber on group "bar"
stanConnection_QueueSubscribe(&qsub, "foo", "bar", onMsg, NULL, NULL);

// Durable queue subscriber on group "bar"
stanSubOptions_Create(&subOpts);
stanSubOptions_SetDurableName(subOpts, "mydurablegroup");
stanConnection_QueueSubscribe(&qsub, "foo", "bar", onMsg, NULL, subOpts);

// The same message produced on "foo" would be received by both queue subscribers.

Начальная позиция группы устойчивых очередей

Правила для неустойчивых подписчиков очереди применяются к постоянным подписчикам.

Выход из группы устойчивых очередей

Что касается подписчиков с неустойчивой очередью, если соединение участника закрыто или если Unsubscribeего вызывают, участник покидает группу. Любое неподтвержденное сообщение передается оставшимся участникам. См. Закрытие группы для важных отличий с подписчиками с неустойчивой очередью.

Закрытие группы устойчивых очередей

Звонит последний участник Unsubscribeзакроет (то есть разрушит) группа. Поэтому, если вы хотите сохранить устойчивость группы, вам не следует вызов Unsubscribe.

Таким образом, в отличие от неустойчивых подписчиков очереди, можно поддерживать группу очереди. без члена на сервере. Когда новый участник повторно присоединяется к группе устойчивых очередей, он возобновится с того места, откуда группа ушла, фактически сначала получая все неподтвержденные сообщения, которые могли быть оставлены, когда последний участник ранее ушел.

Потоковые подписки с подстановочными знаками

Подписки NATS Streaming не поддерживают подстановочные знаки.

Потоковое расширенное использование

Состояние подключения

Тот факт, что сервер потоковой передачи NATS и клиенты не подключены напрямую, создает проблему, когда нужно узнать, действителен ли клиент. Когда клиент отключается, сервер потоковой передачи не уведомляется, поэтому важно stanConnection_Close(). Сервер отправляет пульсации в личный почтовый ящик клиента, и если он пропустит определенное количество ответов, он будет считать соединение клиента потерянным и удалит его от своего состояния.

Зачем нужны пинги? Представьте себе случай, когда клиент подключается к серверу NATS, у которого есть маршрут к серверу потоковой передачи NATS (либо на автономный сервер NATS или на встроенный в него сервер). Если соединение между сервером потоковой передачи и сервером NATS клиента разорвано, NATS-соединение клиента все еще будет в порядке, однако связь с сервером потоковой передачи невозможна.

Начиная с потокового сервера NATS 0.10.2, клиентская библиотека будет отправлять PING через равные промежутки времени (по умолчанию 5 секунд). и закроет потоковое соединение после того, как определенное количество запросов PING будет отправлено без ответа (по умолчанию 3). Когда это случается, обратный вызов - если он зарегистрирован - будет вызван, чтобы уведомить пользователя о том, что соединение окончательно потеряно, и причина за провал.

Вот как вы могли бы указать свои собственные значения PING и обратный вызов:

stanConnOptions_Create(&connOpts);

// Send PINGs every 10 seconds, and fail after 5 PINGs without any response.
stanConnOptions_SetPings(connOpts, 10, 5);

// Add a callback to be notified if the STAN connection is lost for good.
stanConnOptions_SetConnectionLostHandler(connOpts, connectionLostCB, NULL);

// Here is an example of the `connectionLostCB`:
connectionLostCB(stanConnection *sc, const char *errTxt, void *closure)
{
    printf("Connection lost: %s\n", errTxt);
}

Обратите внимание, что единственный способ получить уведомление в вашем приложении — установить обратный вызов. Если обратный вызов не установлен, запросы PING все равно отправляются, а соединение будет закрыта при необходимости, но приложение не будет знать, есть ли у него только подписки. Обратный вызов по умолчанию используется для простого вывести в стандартную ошибку идентификатор кластера, идентификатор клиента и ошибку, вызвавшую потерю соединения:

Connection permanently lost: clusterID=test-cluster clientID=client error=connection lost due to PING failure

Когда соединение потеряно, вашему приложению придется заново создать его и все подписки, если таковые имеются.

Библиотека создает собственное соединение NATS и задает для количества попыток повторного подключения значение «бесконечно». Поэтому должно быть возможно чтобы библиотека всегда переподключалась, но это не значит, что потоковое соединение не будет закрыто, даже если вы установите очень высокий порог максимального исходящего значения PING. Имейте в виду, что пока клиент отключен, сервер отправляет сердцебиение также передается клиентам, и, если не будет получено никакого ответа, он удалит этот клиент из своего состояния. Когда связь восстановлена, PING, отправленные на сервер, позволят обнаружить это состояние и сообщить клиенту, что соединение теперь закрыто.

Кроме того, пока клиент «отключен» от сервера, другое приложение, подключенное к потоковому серверу, может connect и использует тот же идентификатор клиента. Сервер при обнаружении повторяющегося идентификатора клиента попытается связаться с первым клиентом. чтобы узнать, должен ли он отклонить запрос на подключение второго клиента. Поскольку связь между сервером и первый клиент не работает, сервер не получит ответа и поэтому заменит первого клиента вторым.

До потокового сервера NATS 0.10.2, если связь между первым клиентом и сервером должна быть восстановлена, и приложение будет отправлять сообщения, сервер примет их, потому что идентификатор клиента опубликованных сообщений будет действительный, хотя клиент не является. С сервером 0.10.2+, с каждым сообщением отправляется дополнительная информация, позволяющая сервер для отклонения сообщений от клиента, который был заменен другим клиентом.

Асинхронная публикация

Базовый API публикации ( stanConnection_Publish()) синхронный; он не возвращает управление вызывающей программе до тех пор, пока Сервер потоковой передачи NATS подтвердил получение сообщения. Для этого создается уникальный идентификатор (GUID) для сообщение о создании, а клиентская библиотека ожидает подтверждения публикации от сервера с соответствующим идентификатором GUID, прежде чем он возвращает управление вызывающей стороне, возможно, с ошибкой, указывающей, что операция не была успешной из-за какого-то сервера. проблема или ошибка авторизации.

Опытные пользователи могут захотеть обрабатывать эти подтверждения публикации вручную, чтобы достичь более высокой пропускной способности публикации, не ожидание отдельных подтверждений во время операции публикации. Для этой цели предоставляется API асинхронной публикации:

static void
_pubAckHandler(const char *guid, const char *error, void *closure)
{
    // Note: this callback can be invoked by different threads
    if (error != NULL)
        printf("pub ack for guid:%s error:%s\n", guid, error);
    else
        printf("Received ack for msg id %s\n", guid);
}

(...)

s = stanConnection_PublishAsync(sc, "foo", (const void*) "hello", 5, _pubAckHandler, NULL);
if (s != NULL)
{
    printf("Error on publish: %d - %s\n", s, natsStatus_GetText(s));
    nats_PrintLastErrorStack(stderr);
}

Если вы хотите сопоставить опубликованное сообщение с guid в обработчике подтверждения, вы должны передать уникальное замыкание в качестве последнего аргумента stanConnection_PublishAsync()вызов. Проверить examples/stan/pub-async.c файл для примера того, как это сделать.

Подтверждение сообщения и повторная доставка

NATS Streaming предлагает семантику доставки «по крайней мере один раз», что означает, что после того, как сообщение было доставлено правомочному подписчику, если подтверждение не получено в течение настроенного интервала времени ожидания, NATS Streaming попытается повторно доставить сообщение. Этот интервал времени ожидания определяется параметром подписки. stanSubOptions_SetAckWait(), который по умолчанию равен 30 секундам.

По умолчанию сообщения автоматически подтверждаются клиентской библиотекой потоковой передачи NATS после того, как обработчик сообщений подписчика вызывается. Однако могут быть случаи, когда подписавшийся клиент хочет ускорить или отложить подтверждение сообщения. Для этого клиент должен установить для подписки режим ручного подтверждения и вызвать stanSubscription_AckMsg(). бывший:

// Subscribe with manual ack mode, and set AckWait to 60 seconds
stanSubOptions_Create(&subOpts);
stanSubOptions_SetManualAckMode(subOpts, true);
stanSubOptions_SetAckWait(subOpts, 60000);
stanConnection_Subscribe(&sub, sc, "foo", onMsg, NULL, subOpts);

// In the callback
void
onMsg(stanConnection *sc, stanSubscription *sub, const char *channel, stanMsg *msg, void *closure)
{
	// ack message before performing I/O intensive operation
    stanSubscription_AckMsg(sub, msg);

	printf("Received a message on %s: %.*s\n",
        channel,
        stanMsg_GetDataLength(msg),
        stanMsg_GetData(msg));
}

Ограничение скорости/сопоставление

Классическая проблема обмена сообщениями типа «публикация-подписка» заключается в сопоставлении скорости производителей сообщений и скорости их потребителей. Производители сообщений часто могут опережать скорость подписчиков, потребляющих их сообщения. Это несоответствие обычно называют проблемой «быстрый производитель/медленный потребитель» и может привести к резким скачкам использования ресурсов. в базовой системе обмена сообщениями, поскольку она пытается буферизовать сообщения до тех пор, пока медленные потребители не смогут их догнать.

Ограничение скорости издателя

NATS Streaming предоставляет вариант подключения, который называется stanConnOptions_SetMaxPubAcksInflight()что эффективно ограничивает количество неподтвержденных сообщений, которые издатель может иметь в пути в любой момент времени. Когда этот максимум будет достигнут, далее опубликовать вызовы будут блокироваться до тех пор, пока количество неподтвержденных сообщений не упадет ниже указанного предела. бывший:

stanConnOptions_Create(&connOpts);
stanConnOptions_SetMaxPubAcksInflight(connOpts, 25, 1.0);
stanConnection_Connect(&sc, cluster, clientID, connOpts);

(...)
static void
_pubAckHandler(const char *guid, const char *error, void *closure)
{
    // process the ack
    ...
}

(...)

for (i = 1; i < 1000; i++)
{
    // If the server is unable to keep up with the publisher, the number of outstanding acks will eventually
    // reach the max and this call will block
    stanConnection_PublishAsync(sc, "foo", (const void*) "hello", 5, _pubAckHandler, NULL);
}

Обратите внимание на последний параметр stanConnOptions_SetMaxPubAcksInflight(). Это число с плавающей запятой, указывающее процент ожидающих подтверждения ACK упадет ниже, прежде чем будет разрешено отправлять больше сообщений. Например, если максимум 1000 и процент 50% (0,5), это означает, что если вызовы публикации должны быть заблокированы, потому что библиотека отправила 1000 сообщений и еще не получила ACK от сервера, вызовы публикации будут разблокированы только когда библиотека получила 500 ACK от сервера. Это предотвращает блокировку соединения и разблокируется для каждого сообщения, когда лимит был достигнут.

Ограничение абонентской платы

Ограничение скорости также может быть выполнено на стороне абонента для каждой подписки с использованием подписки. опция называется stanSubOptions_SetMaxInflight(). Этот параметр указывает максимальное количество незавершенных подтверждений. (сообщения, которые были доставлены, но не подтверждены), которые NATS Streaming позволит использовать для данной подписки. Когда этот предел будет достигнут, NATS Streaming приостановит доставку сообщений в эту подписку до тех пор, пока количество неподтвержденных сообщений падает ниже указанного предела. бывший:

// Subscribe with manual ack mode and a max in-flight limit of 25
stanSubOptions_Create(&subOpts);
stanSubOptions_SetManualAckMode(subOpts, true);
stanSubOptions_SetMaxInflight(subOpts, 25);
stanConnection_Subscribe(&sub, "foo", onMsg, NULL, subOpts);

// In the callback
void
onMsg(stanConnection *sc, stanSubscription *sub, const char *channel, stanMsg *msg, void *closure)
{
    printf("Received a message\n");
    ...
    // Does not ack, or takes a very long time to ack
    ...
    // Message delivery will suspend when the number of unacknowledged messages reaches 25
}

Однако сервер будет повторно доставлять сообщения, для которых он не получил подтверждения более переданное значение stanSubOptions_SetAckWait()(или 30 секунд по умолчанию).

Лицензия

Если не указано иное, исходные файлы NATS распространяются под лицензией Apache версии 2.0, найденной в файле LICENSE.

 

Добавить комментарий

Обратная связь

Интересуют вопросы реализации алгоритмов, программирования, выбора электроники и прочая информация, постараюсь осветить в отдельных статьях

пишите мне на netdm@mail.ru