# Stream and Decode reports using the Rust SDK (WebSocket)
Source: https://docs.chain.link/datalink/pull-delivery/tutorials/stream-decode/ws-rust

> For the complete documentation index, see [llms.txt](/llms.txt).

<PageTabs
  pages={[
  {
    name: "Stream and decode reports using the Go SDK",
    url: "/datalink/pull-delivery/tutorials/stream-decode/ws-go",
    icon: "/images/tutorial-icons/go_logo_black.png",
  },
  {
    name: "Stream and decode reports using the Rust SDK",
    url: "/datalink/pull-delivery/tutorials/stream-decode/ws-rust",
    icon: "/images/tutorial-icons/rust_logo_blk.svg",
  },
]}
/>

In this guide, you'll learn how to use the [Data Streams SDK](/data-streams/reference/streams-direct/streams-direct-rust-sdk) for Rust to stream and decode DataLink feeds from the Aggregation Network. You'll set up your Rust project, listen for real-time reports, decode them, and log their attributes.

## Requirements

- **Rust**: Make sure you have Rust installed. You can install Rust by following the instructions on the official [Rust website](https://www.rust-lang.org/tools/install).
- **API Credentials**: Access to DataLink requires API credentials to connect to the Aggregation Network. If you haven't already, [contact us](https://chain.link/contact) to request access.

## Guide

### Set up your Rust project

1. Create a new directory for your project and navigate to it:

   ```bash
   mkdir my-datalink-project && cd my-datalink-project
   ```

2. Initialize a new Rust project:

   ```bash
   cargo init
   ```

3. Add the following dependencies to your `Cargo.toml` file:

   ```toml
   [dependencies]
   chainlink-data-streams-sdk = "1.0.0"
   chainlink-data-streams-report = "1.0.0"
   tokio = { version = "1.4", features = ["full"] }
   hex = "0.4"
   tracing = "0.1"
   tracing-subscriber = { version = "0.3", features = ["time"] }
   ```

   Note: The `tracing` feature is required for logging functionality.

### Understanding Report Schema Versions

Data Providers may use different report schema versions. The schema version determines the structure of the data returned by the feed and affects how you should decode the report.

1. Import the appropriate schema version in your code (e.g., `v4`).
2. Use that version when decoding the report with `report.Decode[v4.Data]()`.

Different schema versions have different fields and structures.

In this example, we're using report schema `v4` for the EUR/USD feed, but your implementation should match the schema version specified by your Data Provider.

### Establish a WebSocket connection and listen for real-time reports

1. Replace the contents of `src/main.rs` with the following code:

   ```rust
   use chainlink_data_streams_report::feed_id::ID;
   use chainlink_data_streams_report::report::{ decode_full_report, v4::ReportDataV4 };
   use chainlink_data_streams_sdk::config::Config;
   use chainlink_data_streams_sdk::stream::Stream;
   use std::env;
   use std::error::Error;
   use tracing::{ info, warn };
   use tracing_subscriber::fmt::time::UtcTime;

   #[tokio::main]
   async fn main() -> Result<(), Box<dyn Error>> {
      // Initialize logging with UTC timestamps
      tracing_subscriber
         ::fmt()
         .with_timer(UtcTime::rfc_3339())
         .with_max_level(tracing::Level::INFO)
         .init();

      // Get feed IDs from command line arguments
      let args: Vec<String> = env::args().collect();
      if args.len() < 2 {
         eprintln!("Usage: cargo run [FeedID1] [FeedID2] ...");
         std::process::exit(1);
      }

      // Get API credentials from environment variables
      let api_key = env::var("API_KEY").expect("API_KEY must be set");
      let api_secret = env::var("API_SECRET").expect("API_SECRET must be set");

      // Parse feed IDs from command line arguments
      let mut feed_ids = Vec::new();
      for arg in args.iter().skip(1) {
         let feed_id = ID::from_hex_str(arg)?;
         feed_ids.push(feed_id);
      }

      // Initialize the configuration
      let config = Config::new(
         api_key,
         api_secret,
         "https://api.testnet-dataengine.chain.link".to_string(),
         "wss://ws.testnet-dataengine.chain.link".to_string()
      ).build()?;

      // Create and initialize the stream
      let mut stream = Stream::new(&config, feed_ids).await?;
      stream.listen().await?;

      info!("WebSocket connection established. Listening for reports...");

      // Process incoming reports
      loop {
         match stream.read().await {
               Ok(response) => {
                  info!("\nRaw report data: {:?}\n", response.report);

                  // Decode the report
                  let full_report = hex::decode(&response.report.full_report[2..])?;
                  let (_report_context, report_blob) = decode_full_report(&full_report)?;
                  let report_data = ReportDataV4::decode(&report_blob)?;

                  // Print decoded report details
                  info!(
                     "\n--- Report Feed ID: {} ---\n\
                     ------------------------------------------\n\
                     Observations Timestamp : {}\n\
                     Benchmark Price       : {}\n\
                     Valid From Timestamp  : {}\n\
                     Expires At           : {}\n\
                     Link Fee             : {}\n\
                     Native Fee           : {}\n\
                     Market Status        : {}\n\
                     ------------------------------------------",
                     response.report.feed_id.to_hex_string(),
                     response.report.observations_timestamp,
                     report_data.price,
                     response.report.valid_from_timestamp,
                     report_data.expires_at,
                     report_data.link_fee,
                     report_data.native_fee,
                     report_data.market_status
                  );

                  // Print stream stats
                  info!(
                     "\n--- Stream Stats ---\n{:#?}\n\
                        --------------------------------------------------------------------------------------------------------------------------------------------",
                     stream.get_stats()
                  );
               }
               Err(e) => {
                  warn!("Error reading from stream: {:?}", e);
               }
         }
      }

      // Note: In a production environment, you should implement proper cleanup
      // by calling stream.close() when the application is terminated.
      // For example:
      //
      // tokio::select! {
      //     _ = tokio::signal::ctrl_c() => {
      //         info!("Received shutdown signal");
      //         stream.close().await?;
      //     }
      //     result = stream.read() => {
      //         // Process result
      //     }
      // }
   }
   ```

2. Set up your API credentials as environment variables:

   ```bash
   export API_KEY="<YOUR_API_KEY>"
   export API_SECRET="<YOUR_API_SECRET>"
   ```

   Replace `<YOUR_API_KEY>` and `<YOUR_API_SECRET>` with your API credentials.

3. For this example, you'll subscribe to the EUR/USD DataLink feed on testnet. This feed ID is 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce.

   Build and run your application:

   ```bash
   cargo run -- 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce
   ```

   Expect output similar to the following in your terminal:

   ```bash
   2025-06-03T16:18:20.232313Z  INFO my_data_link_project: WebSocket connection established. Listening for reports...
   2025-06-03T16:18:20.232481Z  INFO chainlink_data_streams_sdk::stream::monitor_connection: Received ping: [49]
   2025-06-03T16:18:20.232534Z  INFO chainlink_data_streams_sdk::stream::monitor_connection: Responding with pong: [49]
   2025-06-03T16:18:20.550416Z  INFO chainlink_data_streams_sdk::stream::monitor_connection: Received new report from Data Streams Endpoint.
   2025-06-03T16:18:20.550857Z  INFO my_data_link_project:
   Raw report data: Report { feed_id: 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce, valid_from_timestamp: 1748967500, observations_timestamp: 1748967500, full_report: "0x00090d9e8d96765a0c49e03a6ae05c82e8f8de70cf179baa632f18313e54bd6900000000000000000000000000000000000000000000000000000000004165ff000000000000000000000000000000000000000000000000000000030000000100000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce00000000000000000000000000000000000000000000000000000000683f204c00000000000000000000000000000000000000000000000000000000683f204c00000000000000000000000000000000000000000000000000006f12bdac46c0000000000000000000000000000000000000000000000000004f29241147b58e000000000000000000000000000000000000000000000000000000006866ad4c0000000000000000000000000000000000000000000000000fcb2a7202a220000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000291e7ab37d47a051d06bf9a17e743a30305560fa1ed63eb1e94530b9ff8e00998f2dc9e4876e60bde9f43fbbeb3a1c98bca91b71c98f25a329aa4843a1cdf5acc00000000000000000000000000000000000000000000000000000000000000023d6c77dce452fedcb47942020c574f291fdb259c64e2a42cfce0fbe2f2df092a3703bb5e167b80f388c323ec1d3cf9d298bc077d903a4297671c395e6d34b550" }

   2025-06-03T16:18:20.551775Z  INFO my_data_link_project:
   --- Report Feed ID: 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce ---
   ------------------------------------------
   Observations Timestamp : 1748967500
   Benchmark Price       : 1138050000000000000
   Valid From Timestamp  : 1748967500
   Expires At           : 1751559500
   Link Fee             : 22281758045615502
   Native Fee           : 122126282278592
   Market Status        : 2
   ------------------------------------------
   2025-06-03T16:18:20.551946Z  INFO my_data_link_project:
   --- Stream Stats ---
   StatsSnapshot {
      accepted: 1,
      deduplicated: 0,
      total_received: 1,
      partial_reconnects: 0,
      full_reconnects: 0,
      configured_connections: 1,
      active_connections: 1,
   }
   --------------------------------------------------------------------------------------------------------------------------------------------
   2025-06-03T16:18:21.503569Z  INFO chainlink_data_streams_sdk::stream::monitor_connection: Received new report from Data Streams Endpoint.
   2025-06-03T16:18:21.503786Z  INFO my_data_link_project:
   Raw report data: Report { feed_id: 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce, valid_from_timestamp: 1748967501, observations_timestamp: 1748967501, full_report: "0x00090d9e8d96765a0c49e03a6ae05c82e8f8de70cf179baa632f18313e54bd690000000000000000000000000000000000000000000000000000000000416602000000000000000000000000000000000000000000000000000000030000000100000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260010100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce00000000000000000000000000000000000000000000000000000000683f204d00000000000000000000000000000000000000000000000000000000683f204d00000000000000000000000000000000000000000000000000006f120e9d6f70000000000000000000000000000000000000000000000000004f2899581124ed000000000000000000000000000000000000000000000000000000006866ad4d0000000000000000000000000000000000000000000000000fcb2a7202a2200000000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002bfc1839b35307881f3bca8fb4d5f08dc3da6d60f8ed43e31b36bbccbdc8e1abb9f8bf045a6c3cb96fba8536e81b3e92a54b4762cd1a85ad552cf4c664715c0bd00000000000000000000000000000000000000000000000000000000000000023b0c7ad0fdfdb598d53fee4d7026957c1c30e8c9056a267dc40d8ee8000168a72d6a2349a71f41a200b8f600fbedfb5b4c4ebf69ac86a794748268a9b3972594" }

   2025-06-03T16:18:21.504481Z  INFO my_data_link_project:
   --- Report Feed ID: 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce ---
   ------------------------------------------
   Observations Timestamp : 1748967501
   Benchmark Price       : 1138050000000000000
   Valid From Timestamp  : 1748967501
   Expires At           : 1751559501
   Link Fee             : 22281162232767725
   Native Fee           : 122123345293168
   Market Status        : 2
   ------------------------------------------
   2025-06-03T16:18:21.504537Z  INFO my_data_link_project:
   --- Stream Stats ---
   StatsSnapshot {
      accepted: 2,
      deduplicated: 0,
      total_received: 2,
      partial_reconnects: 0,
      full_reconnects: 0,
      configured_connections: 1,
      active_connections: 1,
   }
   [...]
   ```

The example above demonstrates streaming data from a single crypto stream. For production environments, especially when subscribing to multiple streams, it's recommended to enable [High Availability (HA) mode](https://github.com/smartcontractkit/data-streams-sdk/blob/main/rust/docs/examples/wss_multiple.md). This can be achieved by:

1. Adding multiple WebSocket endpoints in the configuration:

   ```rust
   "wss://ws.testnet-dataengine.chain.link,wss://ws.testnet-dataengine.chain.link"
   ```

2. Enabling HA mode in the configuration:
   ```rust
   use chainlink_data_streams_sdk::config::WebSocketHighAvailability;
   // ...
   .with_ws_ha(WebSocketHighAvailability::Enabled)
   ```

When HA mode is enabled and multiple WebSocket origins are provided, the Stream will maintain concurrent connections to different instances. This ensures high availability, fault tolerance, and minimizes the risk of report gaps.

#### Decoded report details

The decoded report details include:

| Attribute                | Value                                                                | Description                                                                                                                                                                                                 |
| ------------------------ | -------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `Feed ID`                | `0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce` | The unique identifier for the feed. In this example, the feed is for EUR/USD.                                                                                                                               |
| `Observations Timestamp` | `1748967501`                                                         | The timestamp indicating when the data was captured.                                                                                                                                                        |
| `Benchmark Price`        | `1138050000000000000`                                                | The observed price in the report, with 18 decimals. For readability: `1.138050000000000000` USD per EUR.                                                                                                    |
| `Valid From Timestamp`   | `1748967501`                                                         | The start validity timestamp for the report, indicating when the data becomes relevant.                                                                                                                     |
| `Expires At`             | `1751559501`                                                         | The expiration timestamp of the report, indicating the point at which the data becomes outdated.                                                                                                            |
| `Link Fee`               | `22281162232767725`                                                  | The fee to pay in LINK tokens for the onchain verification of the report data. With 18 decimals. For readability: `0.022281162232767725` LINK. **Note:** This example fee is not indicative of actual fees. |
| `Native Fee`             | `122123345293168`                                                    | The fee to pay in the native blockchain token (e.g., ETH on Ethereum) for the onchain verification of the report data. With 18 decimals. **Note:** This example fee is not indicative of actual fees.       |
| `Market Status`          | `2`                                                                  | The market status for the feed. In this example, `2` indicates the market is `Open`.                                                                                                                        |

#### Payload for onchain verification

In this guide, you log and decode the `full_report` payload to extract the report data. In a
production environment, you should verify the data to ensure its integrity and authenticity. Refer to the
[Verify report data onchain](/datalink/pull-delivery/tutorials/onchain-verification-evm) guide.

## Adapting code for different report schema versions

When working with different DataLink providers, you'll need to adapt your code to handle the specific report schema version they use:

1. Import the correct schema version module. Examples:

   - For v4 schema (as used in this example):

   ```rust
   use chainlink_data_streams_report::report::{ decode_full_report, v4::ReportDataV4 };
   ```

   - For v3 schema:

   ```rust
   use chainlink_data_streams_report::report::{ decode_full_report, v3::ReportDataV3 };
   ```

2. Update the decode function to use the correct schema version. Examples:

   - For v4 schema (as used in this example):

   ```rust
   let report_data = ReportDataV4::decode(&report_blob)?;
   ```

   - For v3 schema:

   ```rust
   let report_data = ReportDataV3::decode(&report_blob)?;
   ```

3. Access fields according to the schema version structure.

## Explanation

### Establishing a WebSocket connection and listening for reports

The WebSocket connection is established in two steps:

1. [`Stream::new`](https://github.com/smartcontractkit/data-streams-sdk/blob/main/rust/crates/sdk/src/stream.rs#L131) initializes a new stream instance with your configuration and feed IDs. This function prepares the connection parameters but doesn't establish the connection yet.

2. [`stream.listen()`](https://github.com/smartcontractkit/data-streams-sdk/blob/main/rust/crates/sdk/src/stream.rs#L162) establishes the actual WebSocket connection and starts the background tasks that maintain the connection. These tasks handle:
   - Automatic reconnection if the connection is lost
   - Ping/pong messages to keep the connection alive
   - Message queueing and delivery

### Decoding a report

As data reports arrive via the WebSocket connection, they are processed in real-time through several steps:

1. Reading streams: The [`read`](https://github.com/smartcontractkit/data-streams-sdk/blob/main/rust/crates/sdk/src/stream.rs#L218) method on the Stream object is called within a loop. This asynchronous method:
   - Awaits the next report from the WebSocket connection
   - Handles backpressure automatically
   - Returns a [`WebSocketReport`](https://github.com/smartcontractkit/data-streams-sdk/blob/main/rust/crates/sdk/src/stream.rs#L43) containing the report data

2. Decoding reports: Each report is decoded in two stages:
   - [`decode_full_report`](https://github.com/smartcontractkit/data-streams-sdk/blob/main/rust/crates/report/src/report.rs#L77) parses the raw hexadecimal data, separating the report context (containing metadata) from the report blob
   - [`ReportDataV4::decode`](https://github.com/smartcontractkit/data-streams-sdk/blob/main/rust/crates/report/src/report/v4.rs#L57) transforms the report blob into a structured format containing:
     - The benchmark price (with 18 decimal places)
     - Fee information for onchain verification (LINK and native fees)
     - Expiration timestamp
     - Market status indicator

### Handling the decoded data

The example demonstrates several best practices for handling the decoded data:

1. Logging:
   - Uses the [`tracing`](https://github.com/tokio-rs/tracing) crate for structured logging
   - Configures UTC timestamps for consistent time representation
   - Includes both raw report data and decoded fields for debugging

2. Error handling:
   - Uses Rust's `Result` type for robust error handling
   - Implements the `?` operator for clean error propagation
   - Logs errors with appropriate context using `warn!` macro

3. Stream monitoring:
   - Tracks stream statistics through [`get_stats()`](https://github.com/smartcontractkit/data-streams-sdk/blob/main/rust/crates/sdk/src/stream.rs#L253)
   - Monitors connection status and reconnection attempts
   - Reports message acceptance and deduplication counts

The decoded data can be used for further processing, analysis, or display in your application. For production environments, it's recommended to verify the data onchain using the provided `full_report` payload.