Step 1: Import required libraries
from python_graphql_client import GraphqlClient
import asyncio
This code block imports the GraphqlClient library and asyncio for asynchronous operations.
Step 2: Set up the OAuth token
auth_token = "Bearer YOUR_ACCESS_TOKEN"
Set up the OAuth token. Replace YOUR_ACCESS_TOKEN with your actual access token.
Step 3: Set up the GraphQL client
ws = GraphqlClient(endpoint="wss://streaming.bitquery.io/graphql", headers={"Authorization": auth_token})
This code sets up a GraphqlClient instance with the Bitquery WebSocket endpoint and OAuth authorization header.
Step 4: Define the subscription callback function
def callback(response):
print("got new OHLC data:")
print(response)
This function is called every time new data is received from the WebSocket stream. The function prints the response.
Step 5: Define the GraphQL subscription query
query = """
subscription {
EVM(network: bsc) {
Transfers {
Block {
Number
Hash
}
Transfer {
Currency {
Symbol
Name
}
Amount
}
}
}
}
"""
This code block defines the GraphQL subscription query, which is sent to the Bitquery API to subscribe to the Binance Smart Chain network’s Transfer events.
Step 6: Make an asynchronous subscription request
asyncio.run(ws.subscribe(query=query, handle=callback))
This code block makes an asynchronous subscription request to the Bitquery API with the defined subscription query and the handle parameter set to callback. The handle parameter specifies the function that should be called when new data is received.
Output on Terminal
Final Code
import asyncio
from python_graphql_client import GraphqlClient
auth_token = "Bearer YOUR_ACCESS_TOKEN"
ws = GraphqlClient(endpoint="wss://streaming.bitquery.io/graphql", headers={"Authorization": auth_token})
def callback(response):
print("got new OHLC data:")
print(response)
async def subscribe_to_transfers():
query = """
subscription {
EVM(network: bsc) {
Transfers {
Block {
Number
Hash
}
Transfer {
Currency {
Symbol
Name
}
Amount
}
}
}
}
"""
await ws.subscribe(query=query, handle=callback)
asyncio.run(subscribe_to_transfers())
This code uses OAuth for authentication by setting the authorization header with a Bearer token. Make sure to replace YOUR_ACCESS_TOKEN with your actual access token.