Skip to content

Configuration

By default, you can specify configuration when you register CAP services into the DI container for ASP.NET Core project.

services.AddCap(config=> 
{
    // config.XXX 
});
services is IServiceCollection interface, which can be found in the Microsoft.Extensions.DependencyInjection package.

What is minimum configuration required for CAP

you have to configure at least a transport and a storage. If you want to get started quickly you can use the following configuration:

services.AddCap(capOptions => 
{
     capOptions.UseInMemoryQueue();  //Required Savorboard.CAP.InMemoryMessageQueue nuget package.
     capOptions.UseInmemoryStorage();
});

For specific transport and storage configuration, you can take a look at the configuration options provided by the specific components in the Transports section and the Persistent section.

Configuration in Subscribers

Subscribers use the [CapSubscribe] attribute to mark themselves as subscribers. They can be located in an ASP.NET Core Controller or Service.

When you declare [CapSubscribe], you can change the behavior of the subscriber by specifying the following parameters.

Name

string, required

Subscribe to messages by specifying the Name parameter, which corresponds to the name specified when publishing the message through _cap.Publish("Name").

This name corresponds to different items in different Brokers:

  • In RabbitMQ, it corresponds to the Routing Key.
  • In Kafka, it corresponds to the Topic.
  • In AzureServiceBus, it corresponds to the Subject.
  • In NATS, it corresponds to the Subject.
  • In RedisStreams, it corresponds to the Stream.

Group

string, optional

Specify the Group parameter to place subscribers within a separate consumer group, a concept similar to consumer groups in Kafka. If this parameter is not specified, the current assembly name (DefaultGroupName) is used as the default.

Subscribers with the same Name but set to different groups will all receive messages. Conversely, if subscribers with the same Name are set to the same group, only one will receive the message.

It also makes sense for subscribers with different Names to be set to different groups; they can have independent threads for execution. Conversely, if subscribers with different Names are set to the same group, they will share consumption threads.

Group corresponds to different items in different Brokers:

  • In RabbitMQ, it corresponds to Queue.
  • In Kafka, it corresponds to Consumer Group.
  • In AzureServiceBus, it corresponds to Subscription Name.
  • In NATS, it corresponds to Queue Group.
  • In RedisStreams, it corresponds to Consumer Group.

GroupConcurrent

byte, optional

Set the parallelism of concurrent execution for subscribers by specifying the value of the GroupConcurrent parameter. Concurrent execution means that it needs to be on an independent thread, so if you do not specify the Group parameter, CAP will automatically create a Group using the value of Name.

Note: If you have multiple subscribers set to the same Group and also set the GroupConcurrent value for these subscribers, only the value set by the first subscriber will take effect.

Custom configuration

The CapOptions is used to store configuration information. By default they have default values, sometimes you may need to customize them.

DefaultGroupName

Default: cap.queue.{assembly name}

The default consumer group name, corresponds to different names in different Transports, you can customize this value to customize the names in Transports for easy viewing.

Mapping

Map to Queue Names in RabbitMQ.
Map to Consumer Group Id in Apache Kafka.
Map to Subscription Name in Azure Service Bus.
Map to Queue Group Name in NATS. Map to Consumer Group in Redis Streams.

GroupNamePrefix

Default: Null

Add unified prefixes for consumer group. https://github.com/dotnetcore/CAP/pull/780

TopicNamePrefix

Default: Null

Add unified prefixes for topic/queue name. https://github.com/dotnetcore/CAP/pull/780

Versioning

Default: v1

It is used to specify a version of a message to isolate messages of different versions of the service. It is often used in A/B testing or multi-service version scenarios. Following are application scenarios that needs versioning:

Business Iterative and compatible

Due to the rapid iteration of services, the data structure of the message is not fixed during each service integration process. Sometimes we add or modify certain data structures to accommodate the newly introduced requirements. If you have a brand new system, there's no problem, but if your system is already deployed to a production environment and serves customers, this will cause new features to be incompatible with the old data structure when they go online, and then these changes can cause serious problems. To work around this issue, you can only clean up message queues and persistent messages before starting the application, which is obviously not acceptable for production environments.

Multiple versions of the server

Sometimes, the server's server needs to provide multiple sets of interfaces to support different versions of the app. Data structures of the same interface and server interaction of these different versions of the app may be different, so usually server does not provide the same routing addresses to adapt to different versions of App calls.

Using the same persistent table/collection in different instance

If you want multiple different instance services to use the same database, in versions prior to 2.4, we could isolate database tables for different instances by specifying different table names. After version 2.4 this can be achived through CAP configuration, by configuring different table name prefixes.

Check out the blog to learn more about the Versioning feature: https://www.cnblogs.com/savorboard/p/cap-2-4.html

FailedRetryInterval

Default: 60 sec

During the message sending process if message transport fails, CAP will try to send the message again. This configuration option is used to configure the interval between each retry.

During the message sending process if consumption method fails, CAP will try to execute the method again. This configuration option is used to configure the interval between each retry.

Retry & Interval

By default if failure occurs on send or consume, retry will start after 4 minutes (FallbackWindowLookbackSeconds) in order to avoid possible problems caused by setting message state delays.
Failures in the process of sending and consuming messages will be retried 3 times immediately, and will be retried polling after 3 times, at which point the FailedRetryInterval configuration will take effect.

Multi-instance concurrent retries

We introduced database-based distributed locks in version 7.1.0 to solve the problem of retrying concurrent fetches from the database under multiple instances, you need to explicitly configure UseStorageLock to true.

UseStorageLock

Default: false

If set to true, we will use a database-based distributed lock to solve the problem of concurrent fetches data by retry processes with multiple instances. This will generate the cap.lock table in the database.

CollectorCleaningInterval

Default: 300 sec

The interval of the collector processor deletes expired messages.

ConsumerThreadCount

Default: 1

Number of consumer threads, when this value is greater than 1, the order of message execution cannot be guaranteed.

FailedRetryCount

Default: 50

Maximum number of retries. When this value is reached, retry will stop and the maximum number of retries will be modified by setting this parameter.

FallbackWindowLookbackSeconds

Default: 240 sec

Configure the retry processor to pick up the fallback window lookback time for Scheduled or Failed status messages.

FailedThresholdCallback

Default: NULL

Type: Action<FailedInfo>

Failure threshold callback. This action is called when the retry reaches the value set by FailedRetryCount, you can receive notification by specifying this parameter to make a manual intervention. For example, send an email or notification.

SucceedMessageExpiredAfter

Default: 24*3600 sec (1 days)

The expiration time (in seconds) of the success message. When the message is sent or consumed successfully, it will be removed from database storage when the time reaches SucceedMessageExpiredAfter seconds. You can set the expiration time by specifying this value.

FailedMessageExpiredAfter

Default: 15*24*3600 sec(15 days)

The expiration time (in seconds) of the failed message. When the message is sent or consumed failed, it will be removed from database storage when the time reaches FailedMessageExpiredAfter seconds. You can set the expiration time by specifying this value.

[Removed] UseDispatchingPerGroup

Default: false

Removed in version 8.2, already default behavior

If true then all consumers within the same group pushes received messages to own dispatching pipeline channel. Each channel has set thread count to ConsumerThreadCount value.

[Obsolete] EnableConsumerPrefetch

Default: false, Before version 7.0 the default behavior is true

Renamed to EnableSubscriberParallelExecute option, Please use the new option.

EnableSubscriberParallelExecute

Default: false

If true, CAP will prefetch some message from the broker as buffered, then execute the subscriber method. After the execution is done, it will fetch the next batch for execution.

Precautions

Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes (FallbackWindowLookbackSeconds) ago by default , that is to say, if the message backlog of more than 4 minutes (FallbackWindowLookbackSeconds) on the consumer side will be picked up again and executed again

SubscriberParallelExecuteThreadCount

Default: Environment.ProcessorCount

With the EnableSubscriberParallelExecute option enabled, specify the number of parallel task execution threads.

SubscriberParallelExecuteBufferFactor

Default: 1

With the EnableSubscriberParallelExecute option enabled, multiplier used to determine the buffered capacity size in subscriber parallel execution. The buffer capacity is computed by multiplying this factor with the value of SubscriberParallelExecuteThreadCount, which represents the number of threads allocated for parallel processing.

EnablePublishParallelSend

Default: false, The (7.2 <= Version < 8.1) the default behavior is true

By default, messages sent are first placed into the Channel in memory and then processed linearly. If set to true, the task of sending messages will be processed in parallel by the .NET thread pool, which will greatly increase the speed of sending.