Let’s suppose you need to stream data from an external source, but that data source does not support push natively, so you are forced to resort to periodic polling. How do you implement this in Java, as simply as possible, while remaining responsive?
General idea
Before we dive into code, let’s discuss the requirements and the general idea first. What we need to do is basically trigger a fetch operation at a fixed rate, blocking the client until new data becomes available. Furthermore, let’s assume that we want to remain responsive, so we should not block indefinitely, but unblock after a given maximum amount of time has passed, after which the client can react accordingly (try again, abort, or do something else).
To fulfill these requirements, we will implement a variation of the Token Bucket Algorithm , which is commonly used for Traffic Shaping . In this algorithm, a fixed number of tokens is placed periodically in a virtual bucket of a specified capacity. Concurrently, another thread waiting to perform some operation (for example sending a data packet over the network) checks the contents of the bucket, and if there are enough tokens in it, remove them from the bucket, and perform the operation. In this article, we will simplify the algorithm by simulating a bucket with a capacity of one and using only one consumer thread.
Implementation
Since our bucket has the capacity of one, it will have only two states (full and empty). This can be represented by a single Boolean value, true meaning full and false meaning empty:
1private boolean fetch = true; // we start to fetch right away
Furthermore, we need to schedule a task that will “fill the bucket” periodically at a fixed rate. This is done using a ScheduledExecutorService :
1void start() {
2 ScheduledExecutorService es = Executors.newScheduledThreadPool(1);
3 es.scheduleAtFixedRate(this::scheduleFetch, FETCH_INTERVAL, FETCH_INTERVAL, TimeUnit.MILLISECONDS);
4}
What does the scheduleFetch operation look like? It simply sets the fetch variable to true (fills the bucket) and notifies another (fetching) thread, which might at that moment be waiting for the state of our bucket to change. For the discussion on why the next two methods must be synchronized, see this stack overflow question .
1synchronized void scheduleFetch() {
2 fetch = true;
3 notify();
4}
Next, we will provide an operation which will return immediately if the bucket is full or block for a given maximum amount of time, waiting for it to become full, returning the most recent state of the bucket, and emptying it eventually:
1synchronized boolean awaitFetch() throws InterruptedException {
2 if (!fetch)
3 wait(WAIT_LIMIT);
4 try {
5 return fetch;
6 } finally {
7 fetch = false;
8 }
9}
Since we will not block longer than WAIT_LIMIT, this method is guaranteed to return in no longer than WAIT_LIMIT. We need this guarantee to ensure responsiveness, as we will see shortly. In total, the operation signals to the caller whether it is allowed to perform a fetch, returning in no longer than WAIT_LIMIT milliseconds.
With this in place, and assuming that the actual fetch operation (sending a request over the network, interpreting the response etc.) is implemented in the doFetch method, we can finally implement our blocking poll method:
1List poll() throws InterruptedException {
2 return awaitFetch() ? doFetch() : null;
3}
Here, null signals to the client the fact that no fresh data is available yet. In fact, this is the exact protocol Source Connectors in Kafka Connect are required to implement, and the described implementation is used in the PLC4X Source Connector.
Remarks
There are two main parameters in this program: WAIT_LIMIT and FETCH_INTERVAL. The former controls the responsiveness of the client – the lower the WAIT_LIMIT, the quicker the control is returned to the client in case no fresh data is available.
The second parameter controls the maximum request (sampling) rate. It is in fact an upper bound because the effective sampling rate might be lower – that is, when the fetch operation takes longer to execute than FETCH_INTERVAL.
Alternatives
Although the proposed solution works, there are alternatives. One such alternative is to fetch data in the scheduled periodic task directly instead of notifying the fetching (client) thread. However, since we need to block the client thread waiting for fresh data, we must pass fetched results from the periodic task back to the client, for example through a Blocking Queue .
Another alternative is to use a ready-made utility class for this kind of tasks, for example RateLimiter from the Google Guava Library . This would simplify the implementation even more. However, you will have to add another library dependency to your project, which, depending on circumstances, might be appropriate for you or not.
Conclusion
Simple responsive polling can be implemented surprisingly easily by employing a variation of the Token Bucket Algorithm, using two low-level synchronization primitives of the Java platform: wait and notify. Although common knowledge dictates that you should never ever mess with basic synchronization primitives and use abstractions in java.util.concurrent instead, this example demonstrates that sometimes it is OK to break the rules, if it gets the job done.
More articles
fromAndrey Skorikov
Your job at codecentric?
Jobs
Agile Developer und Consultant (w/d/m)
Alle Standorte
Gemeinsam bessere Projekte umsetzen.
Wir helfen deinem Unternehmen.
Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.
Hilf uns, noch besser zu werden.
Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.
Blog author
Andrey Skorikov
Do you still have questions? Just send me a message.
Do you still have questions? Just send me a message.