// In order to get it up and running, you will have to include quite a lot of .JARs in your pom.xml
// THIS is working in a correct way.
// https://docs.bitquery.io/docs/graphql/subscription/websockets/
// The only problem seems to be => having to restart the connection a bit too often
package at.bitquery;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.logging.HttpLoggingInterceptor;
import okhttp3.logging.HttpLoggingInterceptor.Level;
import utils.Environment;
public class WebSocketSubscriptionAllgemeinV2 {
static Logger logger = LoggerFactory.getLogger(WebSocketSubscriptionAllgemeinV2.class);
public OkHttpClient client;
public Request request;
private String queryString;
private JsonNode jsonParameters;
private String jsonParamString = null;
private int subscriptionId;
public WebSocketSubscriptionAllgemeinV2 (String queryString) throws Exception {
this(queryString, (new ObjectMapper()).readTree("{}"),1);
}
public WebSocketSubscriptionAllgemeinV2 (String queryString, JsonNode parameters, int subscriptionId) {
this.client = buildHttpClient();
this. request = buildRequest();
this.queryString = queryString;
this.jsonParameters = parameters;
this.subscriptionId = subscriptionId;
}
public WebSocketSubscriptionAllgemeinV2 (String queryString, String parameterString, int subscriptionId) {
this.client = buildHttpClient();
this. request = buildRequest();
this.queryString = queryString;
this.jsonParamString = parameterString;
this.subscriptionId = subscriptionId;
}
// THE MAIN method has been added only for demonstration purposes
// the REAL logic takes place in sub-classes of this Super-class ...
public static void main(String[] args) throws Exception {
String query = "{\"query\":\"subscription getNewBlock { EVM(network: eth) { Blocks(limit: {count: 2}) { Block { Number Time } } } }\"}}";
String param = "{}";
WebSocketSubscriptionAllgemeinV2 webSockSub = new WebSocketSubscriptionAllgemeinV2(query, param,10);
webSockSub.processSubscription(webSockSub.getClient(),webSockSub.getRequest());
// 2nd Sample WITH PARAMETERS ...
// it will/would only run AFTER the previous Subscription has ended ...
String query2 = "{\"query\":\"subscription SubGetTxForPairAddressV2($netzwerk: evm_network, $tokenContractAddress: String, $pairContractAddress: String) { EVM(network: $netzwerk, trigger_on: head) { buyside: DEXTrades( orderBy: {descending: Block_Time} where: {Trade: {Sell: {Currency: {SmartContract: {is: $tokenContractAddress}}}, Dex: {SmartContract: {is: $pairContractAddress}, Pair: {SmartContract: {}}}}} ) { Transaction { From Hash } Trade { Buy { Amount Currency { Symbol } } Sell { Amount Currency { Name Symbol } } } } } }";
String param2 = "{ \"tokenContractAddress\": \"0x7ae075546e8042dC263FA0eb6519ce0a04EABB93\", \"pairContractAddress\": \"0x83b33b4db2776f119b2ddd902419254775c8db15\", \"netzwerk\": \"eth\" }";
webSockSub = new WebSocketSubscriptionAllgemeinV2(query2, param2,10);
webSockSub.processSubscription(webSockSub.getClient(),webSockSub.getRequest());
}
protected void processSubscription(OkHttpClient client, Request request) {
WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
logger.debug("WebSocket connection closed + closeWebSocket dazu");
if (webSocket != null) {
webSocket.close(50, "Closing WebSocket");
}
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
logger.error("onFailure: " + t.getMessage(), t);
t.printStackTrace();
// Make sure to close the response body
if (response != null) {
response.close();
}
// Reconnect once again
webSocket = initiateReconnection(client, request, this, webSocket);
}
@Override
public void onOpen(WebSocket webSocket, Response response) {
String subscriptionQuery = "{" +
/*NEEDED !!!*/ "\"type\":\"subscribe\"," +
/*NEEDED !!!*/ "\"id\":\"" + getSubscriptionId() + "\"," +
"\"payload\":" + getQueryString();
if (getParameterString().length() > 3) {
subscriptionQuery = subscriptionQuery +
"\" , \"variables\":" + getParameterString() +
"} }";
} else {
subscriptionQuery = subscriptionQuery + " {}";
}
try {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsNode = objectMapper.readTree(subscriptionQuery);
logger.debug("subscriptionQuery=\n" + jsNode.toPrettyString());
} catch (Exception e) {
e.printStackTrace();
}
webSocket.send(subscriptionQuery);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
logger.debug("Received message: " + text);
try {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(text);
JsonNode typeNode = jsonNode.get("type");
if (typeNode != null && typeNode.asText().equals("data")) {
CompletableFuture.runAsync(() -> processData(jsonNode));
} else {
//System.out.println("Received non-data message: >" + typeNode.toPrettyString() + "<");
if (StringUtils.equalsIgnoreCase(typeNode.toPrettyString(), "\"connection_error\"")) {
webSocket = initiateReconnection(client, request, this, webSocket);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
WebSocket webSocket = restartWebSocket(client, request, webSocketListener);
// Wait or sleep as needed (replace with your desired logic)
try {
Thread.sleep(70 * 60 * 1000); // Let the thread run for 70 minutes
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Close the WebSocket connection
if (webSocket != null) {
webSocket.close(1000, "Closing WebSocket");
}
// Shutdown the client
client.dispatcher().executorService().shutdown();
}
}
protected static Request buildRequest() {
Request request = new Request.Builder()
.url("wss://streaming.bitquery.io/graphql")
.addHeader("Sec-WebSocket-Protocol", "graphql-ws")
.addHeader("X-API-KEY", Environment.BITQUERY_APIKEY)
.build();
return request;
}
protected static OkHttpClient buildHttpClient() {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(Level.BODY); // Set the desired log level
OkHttpClient client = new OkHttpClient.Builder()
.retryOnConnectionFailure(true)
.connectionPool(new ConnectionPool(5, 10, TimeUnit.MINUTES)) // Adjust maxIdleConnections and keepAliveDuration
.connectTimeout(60, TimeUnit.MINUTES)
.readTimeout(10, TimeUnit.MINUTES)
.writeTimeout(5, TimeUnit.MINUTES)
.addInterceptor(interceptor)
//.pingInterval(90, TimeUnit.SECONDS)
.build();
return client;
}
// DO THE BUSINESS processing of the JsonRequest
public void processData(JsonNode jsonNode) {
logger.debug("...Processing of: " + jsonNode.toPrettyString());
logger.warn("THIS method must be overwritten by your own business logic ");
};
public WebSocket restartWebSocket(OkHttpClient client, Request request, WebSocketListener webSocketListener) {
return client.newWebSocket(request, webSocketListener);
}
public WebSocket initiateReconnection(OkHttpClient client, Request request, WebSocketListener webSocketListener,
WebSocket webSocket) {
CompletableFuture<WebSocket> futureWebSocket = CompletableFuture.supplyAsync(() -> {
try {
logger.error("__Connection error detected. Reconnecting ...");
System.err.print(".");
setSubscriptionId(getSubscriptionId() + 1); // Increase the subscription ID for safety
// Close the previous WebSocket instance to prevent resource leaks
if (webSocket != null) {
webSocket.close(50, "Closing OLD potentially open WebSocket");
}
// Restart WebSocket and return the new WebSocket instance
WebSocket newWebSocket = restartWebSocket(client, request, webSocketListener);
return newWebSocket;
} catch (Exception e) {
e.printStackTrace();
return null;
}
});
try {
// Wait for the CompletableFuture to complete and get the new WebSocket instance
WebSocket newWebSocket = futureWebSocket.get();
return newWebSocket;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public OkHttpClient getClient() {
return client;
}
public void setClient(OkHttpClient client) {
this.client = client;
}
public Request getRequest() {
return request;
}
public void setRequest(Request request) {
this.request = request;
}
public String getQueryString() {
return queryString;
}
public void setQueryString(String queryString) {
this.queryString = queryString;
}
public JsonNode getJsonParameters() {
return jsonParameters;
}
public void setJsonParameters(JsonNode jsonParameters) {
this.jsonParameters = jsonParameters;
}
public int getSubscriptionId() {
return subscriptionId;
}
public void setSubscriptionId(int subscriptionId) {
this.subscriptionId = subscriptionId;
}
public String getParameterString () {
if (jsonParamString == null) {
try {
return (new ObjectMapper()).writeValueAsString(getJsonParameters());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
} else {
return this.jsonParamString;
}
return null;
}
}
1 Like