Skip to main content

cohere.embed

Conduit processor for Cohere's embed model.

Description

The Cohere embed processor extracts text from the configured inputField, generates embeddings using Cohere's embedding model, and stores the result in the configured outputField. The embeddings are compressed using the zstd algorithm for efficient storage and transmission.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "cohere.embed"
settings:
# APIKey is the API key for Cohere api calls.
# Type: string
apiKey: ""
# Maximum number of retries for an individual record when backing off
# following an error.
# Type: float
backoffRetry.count: "0"
# The multiplying factor for each increment step.
# Type: float
backoffRetry.factor: "2"
# The maximum waiting time before retrying.
# Type: duration
backoffRetry.max: "5s"
# The minimum waiting time before retrying.
# Type: duration
backoffRetry.min: "100ms"
# Specifies the field from which the request body should be created.
# Type: string
inputField: ".Payload.After"
# Specifies the type of input passed to the model. Required for embed
# models v3 and higher. Allowed values: search_document, search_query,
# classification, clustering, image.
# Type: string
inputType: ""
# MaxTextsPerRequest controls the number of texts sent in each Cohere
# embedding API call (max 96)
# Type: int
maxTextsPerRequest: "96"
# Model is one of the Cohere embed models.
# Type: string
model: "embed-english-v2.0"
# OutputField specifies which field will the response body be saved
# at.
# Type: string
outputField: ".Payload.After"
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"

Examples

Generate embeddings using Cohere's embedding model

This example demonstrates how to use the Cohere embedding processor to generate embeddings for a record. The processor extracts text from the configured "inputField" (default: ".Payload.After"), sends it to the Cohere API, and stores the resulting embeddings in the configured "outputField" as compressed data using the zstd algorithm.

In this example, the processor is configured with a mock client and an API key. The input record's metadata is updated to include the embedding model used ("embed-english-v2.0").

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "cohere.embed"
settings:
apiKey: "fake-api-key"
backoffRetry.count: "0"
backoffRetry.factor: "2"
backoffRetry.max: "5s"
backoffRetry.min: "100ms"
inputField: ".Payload.After"
maxTextsPerRequest: "96"
model: "embed-english-v2.0"
outputField: ".Payload.After"

Record difference

Before
After
1
{
1
{
2
  "position": "cG9zLTE=",
2
  "position": "cG9zLTE=",
3
  "operation": "create",
3
  "operation": "create",
4
-
  "metadata": {},
4
+
  "metadata": {
5
+
    "cohere.embed.model": "embed-english-v2.0"
6
+
  },
5
  "key": null,
7
  "key": null,
6
  "payload": {
8
  "payload": {
7
    "before": null,
9
    "before": null,
8
-
    "after": "test input"
10
+
    "after": "(�/�\u0004\u0000i\u0000\u0000[0.1,0.2,0.3]�^xH"
9
  }
11
  }
10
}
12
}