What are gRPC streams?

Geyser or gRPC streams provide the fastest and most efficient way to stream Solana data directly to your backend. With gRPC, you can subscribe to blocks, slots, transactions, and account updates. It is highly configurable. You can filter or limit each subscription. It allows the client-server to create new subscriptions or cancel existing ones immediately.

gRPC is available through OrbitFlare Pass, gRPC Subscription, or Dedicated gRPC Nodes. Please check our Products page for more details on each option.

Subscribe Request

In the subscribe request, you need to include the following:

  • commitment: Specifies the commitment level, which can be processed, confirmed, or finalized.
  • accountsDataSlice: An array of objects { offset: uint64, length: uint64 } that allows you to receive only the required data slices from accounts.
  • ping: An optional boolean. Some cloud providers (e.g., Cloudflare) close idle streams. To keep the connection alive, set this to true. The server will respond with a Pong message every 15 seconds, avoiding the need to resend filters.
const subscriptionRequest: SubscribeRequest = {
  commitment: CommitmentLevel.CONFIRMED,
  accountsDataSlice: [],
  transactions: {},
  accounts: {},
  slots: {},
  blocks: {},
  blocksMeta: {},
  entry: {},
}

Next, you’ll need to specify the filters for the data you want to subscribe to, such as accounts, blocks, slots, or transactions.

Slots

  • filterByCommitment: By default, slots are sent for all commitment levels. With this filter, you can choose to receive only the selected commitment level.
  • interslotUpdates: Enables the subscription to receive updates for changes within a slot, not just at the beginning of new slots. This is useful for more granular, real-time slot data.

Examples

TypeScript Slot Subscription Example

import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";

const GRPC_URL = "<orbitflare-geyser-endpoint-url>";
const X_TOKEN = "<orbitflare-geyser-x-token>";
const PING_INTERVAL_MS = 30_000; // 30s

async function main() {
  // Open connection
  const client = new Client(GRPC_URL, X_TOKEN, {
    "grpc.max_receive_message_length": 64 * 1024 * 1024, // 64MiB
  });

  // Subscribe for events
  const stream = await client.subscribe();

  // Create `error` / `end` handler
  const streamClosed = new Promise<void>((resolve, reject) => {
    stream.on("error", (error) => {
      reject(error);
      stream.end();
    });
    stream.on("end", () => {
      resolve();
    });
    stream.on("close", () => {
      resolve();
    });
  });

  // Handle updates
  stream.on("data", (data) => {
    let ts = new Date();
    if (data.filters[0] == "slot") {
      console.log(
        `${ts.toUTCString()}: Received slot update: ${data.slot.slot}`
      );
    } else if (data.pong) {
      console.log(`${ts.toUTCString()}: Processed ping response!`);
    }
  });

  // Example subscribe request.
  // Listen to all slot updates.
  const slotRequest: SubscribeRequest = {
    slots: {
      slot: { filterByCommitment: true },
    },
    commitment: CommitmentLevel.CONFIRMED,

    // Required, but unused arguments
    accounts: {},
    accountsDataSlice: [],
    transactions: {},
    transactionsStatus: {},
    blocks: {},
    blocksMeta: {},
    entry: {},
  };

  // Send subscribe request
  await new Promise<void>((resolve, reject) => {
    stream.write(slotRequest, (err) => {
      if (err === null || err === undefined) {
        resolve();
      } else {
        reject(err);
      }
    });
  }).catch((reason) => {
    console.error(reason);
    throw reason;
  });

  // Send pings every 5s to keep the connection open
  const pingRequest: SubscribeRequest = {
    ping: { id: 1 },
    // Required, but unused arguments
    accounts: {},
    accountsDataSlice: [],
    transactions: {},
    transactionsStatus: {},
    blocks: {},
    blocksMeta: {},
    entry: {},
    slots: {},
  };
  setInterval(async () => {
    await new Promise<void>((resolve, reject) => {
      stream.write(pingRequest, (err) => {
        if (err === null || err === undefined) {
          resolve();
        } else {
          reject(err);
        }
      });
    }).catch((reason) => {
      console.error(reason);
      throw reason;
    });
  }, PING_INTERVAL_MS);

  await streamClosed;
}

main();

TypeScript Transaction Subscription with Parsed Data

import Client, {
  CommitmentLevel,
  SubscribeRequest,
  SubscribeRequestFilterAccountsFilter
} from "@triton-one/yellowstone-grpc";
import bs58 from 'bs58';

const GRPC_URL = "<orbitflare-geyser-endpoint-url>";
const X_TOKEN = "<orbitflare-geyser-x-token>";
const PING_INTERVAL_MS = 30_000; // 30s

// Add this utility function to process the transaction object
function convertBuffers(obj: any): any {
  if (obj === null || obj === undefined) {
    return obj;
  }

  // Handle Buffer objects
  if (obj.type === 'Buffer' && Array.isArray(obj.data)) {
    return bs58.encode(new Uint8Array(obj.data));
  }

  // Handle arrays
  if (Array.isArray(obj)) {
    return obj.map(item => convertBuffers(item));
  }

  // Handle objects
  if (typeof obj === 'object') {
    // Handle Uint8Array directly
    if (obj instanceof Uint8Array) {
      return bs58.encode(obj);
    }

    const converted: any = {};
    for (const [key, value] of Object.entries(obj)) {
      // Skip certain keys that shouldn't be converted
      if (key === 'uiAmount' || key === 'decimals' || key === 'uiAmountString') {
        converted[key] = value;
      } else {
        converted[key] = convertBuffers(value);
      }
    }
    return converted;
  }

  return obj;
}

async function main() {
  // Open connection.
  const client = new Client(GRPC_URL, X_TOKEN, {
    "grpc.max_receive_message_length": 1024 * 1024 * 1024, // 1GB
  });

  // Subscribe for events
  const stream = await client.subscribe();

  // Create `error` / `end` handler
  const streamClosed = new Promise<void>((resolve, reject) => {
    stream.on("error", (error) => {
      reject(error);
      stream.end();
    });
    stream.on("end", () => {
      resolve();
    });
    stream.on("close", () => {
      resolve();
    });
  });

  // Handle updates
  stream.on("data", (data) => {
    let ts = new Date();
    if (data) {
      if(data.transaction) {
        const tx = data.transaction;
        // Convert the entire transaction object
        const convertedTx = convertBuffers(tx);
        // If you want to see the entire converted transaction:
        console.log(`${ts.toUTCString()}: Received update: ${JSON.stringify(convertedTx)}`);
      }
      else {
        console.log(`${ts.toUTCString()}: Received update: ${data}`);
      }
      stream.end();
    } else if (data.pong) {
      console.log(`${ts.toUTCString()}: Processed ping response!`);
    }
  });

  // Example subscribe request.
  const request: SubscribeRequest = {
    commitment: CommitmentLevel.PROCESSED,
    accountsDataSlice: [],
    ping: undefined,
    transactions: {
      client: {
        vote: false,
        failed: false,
        accountInclude: [
          "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"
        ],
        accountExclude: [],
        accountRequired: [],
      },
    },
    // unused arguments
    accounts: {},
    slots: {},
    transactionsStatus: {},
    entry: {},
    blocks: {},
    blocksMeta: {},
  };

  // Send subscribe request
  await new Promise<void>((resolve, reject) => {
    stream.write(request, (err: any) => {
      if (err === null || err === undefined) {
        resolve();
      } else {
        reject(err);
      }
    });
  }).catch((reason) => {
    console.error(reason);
    throw reason;
  });

  // Send pings every 30s to keep the connection open
  const pingRequest: SubscribeRequest = {
    // Required, but unused arguments
    accounts: {},
    accountsDataSlice: [],
    transactions: {},
    blocks: {},
    blocksMeta: {},
    slots: {},
    transactionsStatus: {},
    entry: {},
  };
  setInterval(async () => {
    await new Promise<void>((resolve, reject) => {
      stream.write(pingRequest, (err: null | undefined) => {
        if (err === null || err === undefined) {
          resolve();
        } else {
          reject(err);
        }
      });
    }).catch((reason) => {
      console.error(reason);
      throw reason;
    });
  }, PING_INTERVAL_MS);

  await streamClosed;
}

main();

Filter Configuration

It’s possible to add limits for filters in the config. If the filters field is omitted, then filters don’t have any limits.

{
  "grpc": {
    "filters": {
      "accounts": {
        "max": 1,
        "any": false,
        "account_max": 10,
        "account_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
        "owner_max": 10,
        "owner_reject": ["11111111111111111111111111111111"]
      },
      "slots": {
        "max": 1
      },
      "transactions": {
        "max": 1,
        "any": false,
        "account_include_max": 10,
        "account_include_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
        "account_exclude_max": 10,
        "account_required_max": 10
      },
      "blocks": {
        "max": 1,
        "account_include_max": 10,
        "account_include_any": false,
        "account_include_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
        "include_transactions": true,
        "include_accounts": false,
        "include_entries": false
      },
      "blocks_meta": {
        "max": 1
      },
      "entry": {
        "max": 1
      }
    }
  }
}

For more detailed gRPC examples, check out our Getting Started guides on account monitoring, transaction monitoring, entry monitoring, and slot and block monitoring.

Note: gRPC is typically accessed behind a load-balancer or proxy, which may terminate an inactive connection after 10 minutes. The best solution is to implement a ping to the gRPC server every few seconds or minutes to keep the connection alive.