Working Java-Sample for SUBSCRIBING to Bitquery with/without Parameters

// In order to get it up and running, you will have to include quite a lot of .JARs in your pom.xml

// THIS is working in a correct way.
// https://docs.bitquery.io/docs/graphql/subscription/websockets/
// The only problem seems to be => having to restart the connection a bit too often

package at.bitquery;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.logging.HttpLoggingInterceptor;
import okhttp3.logging.HttpLoggingInterceptor.Level;
import utils.Environment;

public class WebSocketSubscriptionAllgemeinV2 {
	static Logger logger = LoggerFactory.getLogger(WebSocketSubscriptionAllgemeinV2.class);
	public OkHttpClient client;
	public Request request;
	private String queryString;
	private JsonNode jsonParameters;
	private String jsonParamString = null;
	private int subscriptionId; 

	public WebSocketSubscriptionAllgemeinV2 (String queryString) throws Exception {
		this(queryString, (new ObjectMapper()).readTree("{}"),1);
	}

	public WebSocketSubscriptionAllgemeinV2 (String queryString, JsonNode parameters, int subscriptionId) {
	  	this.client = buildHttpClient();
        this. request = buildRequest();
        this.queryString = queryString;
        this.jsonParameters = parameters;
        this.subscriptionId = subscriptionId;
	}

	public WebSocketSubscriptionAllgemeinV2 (String queryString, String parameterString, int subscriptionId) {
	  	this.client = buildHttpClient();
        this. request = buildRequest();
        this.queryString = queryString;
        this.jsonParamString = parameterString;
        this.subscriptionId = subscriptionId;
	}
	
	// THE MAIN method has been added only for demonstration purposes
	// the REAL logic takes place in sub-classes of this Super-class ...
    public static void main(String[] args) throws Exception {   	
    	String query = "{\"query\":\"subscription getNewBlock { EVM(network: eth) { Blocks(limit: {count: 2}) { Block { Number Time } } } }\"}}";
    	String param = "{}";
    	
    	WebSocketSubscriptionAllgemeinV2 webSockSub = new WebSocketSubscriptionAllgemeinV2(query, param,10);
    	webSockSub.processSubscription(webSockSub.getClient(),webSockSub.getRequest());
    	
    	// 2nd Sample WITH PARAMETERS ...
    	// it will/would only run AFTER the previous Subscription has ended ...
    	String query2 = "{\"query\":\"subscription SubGetTxForPairAddressV2($netzwerk: evm_network, $tokenContractAddress: String, $pairContractAddress: String) { EVM(network: $netzwerk, trigger_on: head) { buyside: DEXTrades( orderBy: {descending: Block_Time} where: {Trade: {Sell: {Currency: {SmartContract: {is: $tokenContractAddress}}}, Dex: {SmartContract: {is: $pairContractAddress}, Pair: {SmartContract: {}}}}} ) { Transaction { From Hash } Trade { Buy { Amount Currency { Symbol } } Sell { Amount Currency { Name Symbol } } } } } }";
    	String param2 = "{ \"tokenContractAddress\": \"0x7ae075546e8042dC263FA0eb6519ce0a04EABB93\", \"pairContractAddress\": \"0x83b33b4db2776f119b2ddd902419254775c8db15\", \"netzwerk\": \"eth\" }";
    	
    	webSockSub = new WebSocketSubscriptionAllgemeinV2(query2, param2,10);
    	webSockSub.processSubscription(webSockSub.getClient(),webSockSub.getRequest());

    }

	protected void processSubscription(OkHttpClient client, Request request) {
	       WebSocketListener webSocketListener = new WebSocketListener() {
	    	   
	    	   @Override
	            public void onClosed(WebSocket webSocket, int code, String reason) {
	                logger.debug("WebSocket connection closed + closeWebSocket dazu");
	                if (webSocket != null) {
		                webSocket.close(50, "Closing WebSocket");
		            }
	            }

	            @Override
	            public void onFailure(WebSocket webSocket, Throwable t, Response response) {
	                logger.error("onFailure: " + t.getMessage(), t);
	                t.printStackTrace();
	                // Make sure to close the response body
	                if (response != null) {
	                    response.close();
	                }
	                // Reconnect once again
	                webSocket = initiateReconnection(client, request, this, webSocket);
	            }	            
	            @Override
	            public void onOpen(WebSocket webSocket, Response response) {
	                String subscriptionQuery =  "{" + 
/*NEEDED !!!*/	                	"\"type\":\"subscribe\"," + 
/*NEEDED !!!*/	                	"\"id\":\"" + getSubscriptionId() + "\"," +
	                	"\"payload\":" + getQueryString();
	                if (getParameterString().length() > 3) {
	                	subscriptionQuery = subscriptionQuery +                		
		                		"\" , \"variables\":"  + getParameterString() + 
		        				"} }";	                
	                } else {
	                	subscriptionQuery = subscriptionQuery + " {}";
	                }
	                 try {
	 	                ObjectMapper objectMapper = new ObjectMapper();
						JsonNode jsNode = objectMapper.readTree(subscriptionQuery);
						logger.debug("subscriptionQuery=\n" + jsNode.toPrettyString());
					} catch (Exception e) {
						e.printStackTrace();
					}
	                webSocket.send(subscriptionQuery);
	            }

	            @Override
	            public void onMessage(WebSocket webSocket, String text) {
	                logger.debug("Received message: " + text);

	                try {
	                    ObjectMapper objectMapper = new ObjectMapper();
	                    JsonNode jsonNode = objectMapper.readTree(text);

	                    JsonNode typeNode = jsonNode.get("type");
	                    if (typeNode != null && typeNode.asText().equals("data")) {
	                        CompletableFuture.runAsync(() -> processData(jsonNode)); 
	                    } else {
	                        //System.out.println("Received non-data message: >" + typeNode.toPrettyString() + "<");
	                        if (StringUtils.equalsIgnoreCase(typeNode.toPrettyString(), "\"connection_error\"")) {	            
	                        	webSocket = initiateReconnection(client, request, this, webSocket);
	                        }
	                    }
	                } catch (Exception e) {
	                    e.printStackTrace();
	                }
	            }
	        };

	        WebSocket webSocket = restartWebSocket(client, request, webSocketListener);
	        // Wait or sleep as needed (replace with your desired logic)
	        try {
	            Thread.sleep(70 * 60 * 1000); // Let the thread run for 70 minutes
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        } finally {
	            // Close the WebSocket connection
	            if (webSocket != null) {
	                webSocket.close(1000, "Closing WebSocket");
	            }
	            // Shutdown the client
	            client.dispatcher().executorService().shutdown();
	        }
	}

	protected static Request buildRequest() {
		Request request = new Request.Builder()
                .url("wss://streaming.bitquery.io/graphql")
                .addHeader("Sec-WebSocket-Protocol", "graphql-ws")
                .addHeader("X-API-KEY", Environment.BITQUERY_APIKEY)
                .build();
		return request;
	}

	protected static OkHttpClient buildHttpClient() {
		HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
        interceptor.setLevel(Level.BODY); // Set the desired log level

		OkHttpClient client = new OkHttpClient.Builder()
				.retryOnConnectionFailure(true)
    		    .connectionPool(new ConnectionPool(5, 10, TimeUnit.MINUTES)) // Adjust maxIdleConnections and keepAliveDuration
    		    .connectTimeout(60, TimeUnit.MINUTES)
    		    .readTimeout(10, TimeUnit.MINUTES)
    		    .writeTimeout(5, TimeUnit.MINUTES)
    		    .addInterceptor(interceptor)
    		    //.pingInterval(90, TimeUnit.SECONDS)
    		    .build();
		return client;
	}
    
	// DO THE BUSINESS processing of the JsonRequest
    public void processData(JsonNode jsonNode) {
		logger.debug("...Processing of: " + jsonNode.toPrettyString());
    	logger.warn("THIS method must be overwritten by your own business logic ");
    };

	public WebSocket restartWebSocket(OkHttpClient client, Request request, WebSocketListener webSocketListener) {
        return client.newWebSocket(request, webSocketListener);
    }
    
	public WebSocket initiateReconnection(OkHttpClient client, Request request, WebSocketListener webSocketListener, 
			WebSocket webSocket) {
		CompletableFuture<WebSocket> futureWebSocket = CompletableFuture.supplyAsync(() -> {
	        try {
	            logger.error("__Connection error detected. Reconnecting ...");
	            System.err.print(".");
	            setSubscriptionId(getSubscriptionId() + 1); // Increase the subscription ID for safety
	            
	            // Close the previous WebSocket instance to prevent resource leaks
	            if (webSocket != null) {
		            webSocket.close(50, "Closing OLD potentially open WebSocket");
	            }
	            
	            // Restart WebSocket and return the new WebSocket instance
	            WebSocket newWebSocket = restartWebSocket(client, request, webSocketListener);
	            return newWebSocket;
	        } catch (Exception e) {
	            e.printStackTrace();
	            return null;
	        }
	    });
		try {
	        // Wait for the CompletableFuture to complete and get the new WebSocket instance
	        WebSocket newWebSocket = futureWebSocket.get();
	        return newWebSocket;
	    } catch (Exception e) {
	        e.printStackTrace();
	        return null;
	    }
	}

	public OkHttpClient getClient() {
		return client;
	}

	public void setClient(OkHttpClient client) {
		this.client = client;
	}

	public  Request getRequest() {
		return request;
	}

	public  void setRequest(Request request) {
		this.request = request;
	}

	public String getQueryString() {
		return queryString;
	}

	public void setQueryString(String queryString) {
		this.queryString = queryString;
	}

	public JsonNode getJsonParameters() {
		return jsonParameters;
	}

	public void setJsonParameters(JsonNode jsonParameters) {
		this.jsonParameters = jsonParameters;
	}

	public int getSubscriptionId() {
		return subscriptionId;
	}

	public void setSubscriptionId(int subscriptionId) {
		this.subscriptionId = subscriptionId;
	}
	
	public String getParameterString () {
		if (jsonParamString == null) {
			try {
				return (new ObjectMapper()).writeValueAsString(getJsonParameters());
			} catch (JsonProcessingException e) {
				e.printStackTrace();
			}
		} else {
			return this.jsonParamString;		
		}
		return null;
	}
}
1 Like