Step 1: Import required libraries
from python_graphql_client import GraphqlClient
import asyncio
This code block imports the GraphqlClient library and asyncio for asynchronous operations.
Step 2: Set up the OAuth token
auth_token = "Bearer YOUR_ACCESS_TOKEN"
Set up the OAuth token. Replace YOUR_ACCESS_TOKEN
with your actual access token.
Step 3: Set up the GraphQL client
ws = GraphqlClient(endpoint="wss://streaming.bitquery.io/graphql", headers={"Authorization": auth_token})
This code sets up a GraphqlClient instance with the Bitquery WebSocket endpoint and OAuth authorization header.
Step 4: Define the subscription callback function
def callback(response):
print("got new OHLC data:")
print(response)
This function is called every time new data is received from the WebSocket stream. The function prints the response.
Step 5: Define the GraphQL subscription query
query = """
subscription {
EVM(network: bsc) {
Transfers {
Block {
Number
Hash
}
Transfer {
Currency {
Symbol
Name
}
Amount
}
}
}
}
"""
This code block defines the GraphQL subscription query, which is sent to the Bitquery API to subscribe to the Binance Smart Chain network’s Transfer events.
Step 6: Make an asynchronous subscription request
asyncio.run(ws.subscribe(query=query, handle=callback))
This code block makes an asynchronous subscription request to the Bitquery API with the defined subscription query and the handle
parameter set to callback
. The handle
parameter specifies the function that should be called when new data is received.
Output on Terminal
Final Code
import asyncio
from python_graphql_client import GraphqlClient
auth_token = "Bearer YOUR_ACCESS_TOKEN"
ws = GraphqlClient(endpoint="wss://streaming.bitquery.io/graphql", headers={"Authorization": auth_token})
def callback(response):
print("got new OHLC data:")
print(response)
async def subscribe_to_transfers():
query = """
subscription {
EVM(network: bsc) {
Transfers {
Block {
Number
Hash
}
Transfer {
Currency {
Symbol
Name
}
Amount
}
}
}
}
"""
await ws.subscribe(query=query, handle=callback)
asyncio.run(subscribe_to_transfers())
This code uses OAuth for authentication by setting the authorization header with a Bearer token. Make sure to replace YOUR_ACCESS_TOKEN
with your actual access token.