Working Java-Sample for SUBSCRIBING to Bitquery with/without Parameters

// In order to get it up and running, you will have to include quite a lot of .JARs in your pom.xml

// THIS is working in a correct way.
// Web Sockets | Streaming API (V2 API Docs)
// The only problem seems to be => having to restart the connection a bit too often

package at.bitquery;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.logging.HttpLoggingInterceptor;
import okhttp3.logging.HttpLoggingInterceptor.Level;
import utils.Environment;

public class WebSocketSubscriptionAllgemeinV2 {
static Logger logger = LoggerFactory.getLogger(WebSocketSubscriptionAllgemeinV2.class);
public OkHttpClient client;
public Request request;
private String queryString;
private JsonNode jsonParameters;
private String jsonParamString = null;
private int subscriptionId;

public WebSocketSubscriptionAllgemeinV2 (String queryString) throws Exception {
	this(queryString, (new ObjectMapper()).readTree("{}"),1);
}

public WebSocketSubscriptionAllgemeinV2 (String queryString, JsonNode parameters, int subscriptionId) {
  	this.client = buildHttpClient();
    this. request = buildRequest();
    this.queryString = queryString;
    this.jsonParameters = parameters;
    this.subscriptionId = subscriptionId;
}

public WebSocketSubscriptionAllgemeinV2 (String queryString, String parameterString, int subscriptionId) {
  	this.client = buildHttpClient();
    this. request = buildRequest();
    this.queryString = queryString;
    this.jsonParamString = parameterString;
    this.subscriptionId = subscriptionId;
}

// THE MAIN method has been added only for demonstration purposes
// the REAL logic takes place in sub-classes of this Super-class ...
public static void main(String[] args) throws Exception {   	
	String query = "{\"query\":\"subscription getNewBlock { EVM(network: eth) { Blocks(limit: {count: 2}) { Block { Number Time } } } }\"}}";
	String param = "{}";
	
	WebSocketSubscriptionAllgemeinV2 webSockSub = new WebSocketSubscriptionAllgemeinV2(query, param,10);
	webSockSub.processSubscription(webSockSub.getClient(),webSockSub.getRequest());
	
	// 2nd Sample WITH PARAMETERS ...
	// it will/would only run AFTER the previous Subscription has ended ...
	String query2 = "{\"query\":\"subscription SubGetTxForPairAddressV2($netzwerk: evm_network, $tokenContractAddress: String, $pairContractAddress: String) { EVM(network: $netzwerk, trigger_on: head) { buyside: DEXTrades( orderBy: {descending: Block_Time} where: {Trade: {Sell: {Currency: {SmartContract: {is: $tokenContractAddress}}}, Dex: {SmartContract: {is: $pairContractAddress}, Pair: {SmartContract: {}}}}} ) { Transaction { From Hash } Trade { Buy { Amount Currency { Symbol } } Sell { Amount Currency { Name Symbol } } } } } }";
	String param2 = "{ \"tokenContractAddress\": \"0x7ae075546e8042dC263FA0eb6519ce0a04EABB93\", \"pairContractAddress\": \"0x83b33b4db2776f119b2ddd902419254775c8db15\", \"netzwerk\": \"eth\" }";
	
	webSockSub = new WebSocketSubscriptionAllgemeinV2(query2, param2,10);
	webSockSub.processSubscription(webSockSub.getClient(),webSockSub.getRequest());

}

protected void processSubscription(OkHttpClient client, Request request) {
       WebSocketListener webSocketListener = new WebSocketListener() {
    	   
    	   @Override
            public void onClosed(WebSocket webSocket, int code, String reason) {
                logger.debug("WebSocket connection closed + closeWebSocket dazu");
                if (webSocket != null) {
	                webSocket.close(50, "Closing WebSocket");
	            }
            }

            @Override
            public void onFailure(WebSocket webSocket, Throwable t, Response response) {
                logger.error("onFailure: " + t.getMessage(), t);
                t.printStackTrace();
                // Make sure to close the response body
                if (response != null) {
                    response.close();
                }
                // Reconnect once again
                webSocket = initiateReconnection(client, request, this, webSocket);
            }	            
            @Override
            public void onOpen(WebSocket webSocket, Response response) {
                String subscriptionQuery =  "{" + 

/NEEDED !!!/ “"type":"subscribe",” +
/NEEDED !!!/ “"id":"” + getSubscriptionId() + “",” +
“"payload":” + getQueryString();
if (getParameterString().length() > 3) {
subscriptionQuery = subscriptionQuery +
“" , "variables":” + getParameterString() +
“} }”;
} else {
subscriptionQuery = subscriptionQuery + " {}";
}
try {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsNode = objectMapper.readTree(subscriptionQuery);
logger.debug(“subscriptionQuery=\n” + jsNode.toPrettyString());
} catch (Exception e) {
e.printStackTrace();
}
webSocket.send(subscriptionQuery);
}

            @Override
            public void onMessage(WebSocket webSocket, String text) {
                logger.debug("Received message: " + text);

                try {
                    ObjectMapper objectMapper = new ObjectMapper();
                    JsonNode jsonNode = objectMapper.readTree(text);

                    JsonNode typeNode = jsonNode.get("type");
                    if (typeNode != null && typeNode.asText().equals("data")) {
                        CompletableFuture.runAsync(() -> processData(jsonNode)); 
                    } else {
                        //System.out.println("Received non-data message: >" + typeNode.toPrettyString() + "<");
                        if (StringUtils.equalsIgnoreCase(typeNode.toPrettyString(), "\"connection_error\"")) {	            
                        	webSocket = initiateReconnection(client, request, this, webSocket);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        WebSocket webSocket = restartWebSocket(client, request, webSocketListener);
        // Wait or sleep as needed (replace with your desired logic)
        try {
            Thread.sleep(70 * 60 * 1000); // Let the thread run for 70 minutes
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // Close the WebSocket connection
            if (webSocket != null) {
                webSocket.close(1000, "Closing WebSocket");
            }
            // Shutdown the client
            client.dispatcher().executorService().shutdown();
        }
}

protected static Request buildRequest() {
	Request request = new Request.Builder()
            .url("wss://streaming.bitquery.io/graphql")
            .addHeader("Sec-WebSocket-Protocol", "graphql-ws")
            .addHeader("X-API-KEY", Environment.BITQUERY_APIKEY)
            .build();
	return request;
}

protected static OkHttpClient buildHttpClient() {
	HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
    interceptor.setLevel(Level.BODY); // Set the desired log level

	OkHttpClient client = new OkHttpClient.Builder()
			.retryOnConnectionFailure(true)
		    .connectionPool(new ConnectionPool(5, 10, TimeUnit.MINUTES)) // Adjust maxIdleConnections and keepAliveDuration
		    .connectTimeout(60, TimeUnit.MINUTES)
		    .readTimeout(10, TimeUnit.MINUTES)
		    .writeTimeout(5, TimeUnit.MINUTES)
		    .addInterceptor(interceptor)
		    //.pingInterval(90, TimeUnit.SECONDS)
		    .build();
	return client;
}

// DO THE BUSINESS processing of the JsonRequest
public void processData(JsonNode jsonNode) {
	logger.debug("...Processing of: " + jsonNode.toPrettyString());
	logger.warn("THIS method must be overwritten by your own business logic ");
};

public WebSocket restartWebSocket(OkHttpClient client, Request request, WebSocketListener webSocketListener) {
    return client.newWebSocket(request, webSocketListener);
}

public WebSocket initiateReconnection(OkHttpClient client, Request request, WebSocketListener webSocketListener, 
		WebSocket webSocket) {
	CompletableFuture<WebSocket> futureWebSocket = CompletableFuture.supplyAsync(() -> {
        try {
            logger.error("__Connection error detected. Reconnecting ...");
            System.err.print(".");
            setSubscriptionId(getSubscriptionId() + 1); // Increase the subscription ID for safety
            
            // Close the previous WebSocket instance to prevent resource leaks
            if (webSocket != null) {
	            webSocket.close(50, "Closing OLD potentially open WebSocket");
            }
            
            // Restart WebSocket and return the new WebSocket instance
            WebSocket newWebSocket = restartWebSocket(client, request, webSocketListener);
            return newWebSocket;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    });
	try {
        // Wait for the CompletableFuture to complete and get the new WebSocket instance
        WebSocket newWebSocket = futureWebSocket.get();
        return newWebSocket;
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

public OkHttpClient getClient() {
	return client;
}

public void setClient(OkHttpClient client) {
	this.client = client;
}

public  Request getRequest() {
	return request;
}

public  void setRequest(Request request) {
	this.request = request;
}

public String getQueryString() {
	return queryString;
}

public void setQueryString(String queryString) {
	this.queryString = queryString;
}

public JsonNode getJsonParameters() {
	return jsonParameters;
}

public void setJsonParameters(JsonNode jsonParameters) {
	this.jsonParameters = jsonParameters;
}

public int getSubscriptionId() {
	return subscriptionId;
}

public void setSubscriptionId(int subscriptionId) {
	this.subscriptionId = subscriptionId;
}

public String getParameterString () {
	if (jsonParamString == null) {
		try {
			return (new ObjectMapper()).writeValueAsString(getJsonParameters());
		} catch (JsonProcessingException e) {
			e.printStackTrace();
		}
	} else {
		return this.jsonParamString;		
	}
	return null;
}

}

1 Like