Solution for Programmming Exercise 12.3
This page contains a sample solution to one of the exercises from Introduction to Programming Using Java.
Exercise 12.3:
In the previous exercise, you divided up a large task into a small number of large pieces and created a thread to execute each task. Because of the nature of the problem, this meant that some threads had much more work to do than others -- it is much easier to find the number of divisors of a small number than it is of a big number. As discussed in Subsection 12.3.1, a better approach is to break up the problem into a fairly large number of smaller problems. Subsection 12.3.2 shows how to use a thread pool to execute the tasks: Each thread in the pool runs in a loop in which it repeatedly takes a task from a queue and carries out that task. Implement a thread pool strategy for solving the same maximum-number-of-divisors problem as in the previous exercise.
To make things even more interesting, you should try a new technique for combining the results from all the tasks: Use two queues in your program. Use a queue of tasks, as usual, to hold the tasks that will be executed by the thread pool (Subsection 12.3.2). But also use a queue of results produced by the threads. When a task completes, the result from that task should be placed into the result queue. The main program can read results from the second queue as they become available, and combine all the results to get the final answer. The result queue will have to be a blocking queue (Subsection 12.3.3), since the main program will have to wait for tasks to become available. Note that the main program knows the exact number of results that it expects to read from the queue, so it can do so in a for loop; when the for loop completes, the main program knows that all the tasks have been executed.
I ran the solution to Exercise 12.2 on a computer with four processors, using four threads. The four threads took widely varying amounts of time to complete: 2.6 seconds, 6.4 seconds, 12.9 seconds, and 20.4 seconds. The program ran for just over 20.4 seconds. For a lot of that time, there were only one or two threads running, leaving several processors idle. By using a thread pool as required by the current exercise, I was able to bring the total time for solving the problem down to 10.9 seconds. With the thread pool, the job was divided more evenly among the processors, and none of the processors were idle for very long.
In my solution, I decided to use a ConcurrentLinkedQueue for the task queue and a LinkedBlockingQueue for the result queue. Both of these classes are discussed in Section 12.3, and a ConcurrentLinkedQueue is used for the task queue in the sample program MultiprocessingDemo2.java from that section. (I could have used a blocking queue for the task queue too, as I'll discuss below.)
private static ConcurrentLinkedQueue<Task> taskQueue; private static LinkedBlockingQueue<Result> resultQueue;
We need objects to put in the queues, to represent the tasks and the results. I decided to write a nested class to represent tasks and another nested class to represent results. A task is assigned a range of integers and has to find the largest number of divisors for any integer in that range. We also need to know which integer gave that maximum number. When the task is completed, the results from the task have to be added to the result queue. Here are the Task and Result classes:
private static class Task { int min, max; // Start and end of the range of integers for this task. Task(int min, int max) { this.min = min; this.max = max; } public void compute() { int maxDivisors = 0; int whichInt = 0; for (int i = min; i < max; i++) { int divisors = countDivisors(i); if (divisors > maxDivisors) { maxDivisors = divisors; whichInt = i; } } resultQueue.add( new Result(maxDivisors, whichInt) ); } } private static class Result { int maxDivisorFromTask; // Maximum number of divisors found. int intWithMaxFromTask; // Which integer gave that maximum number. Result(int maxDivisors, int whichInt) { maxDivisorFromTask = maxDivisors; intWithMaxFromTask = whichInt; } }
The big idea when using a tread pool and load balancing is to divide up the overall problem into a fairly large number of fairly small (but not too small) subtasks. In my program, each task is assigned a range of 1000 integers to work on. Since there are 100000 integers in the overall problem, there will be 100 subtasks. The question of how big the subtasks should be does not have a definite answer, but this choice seemed to work well, since the work was divided pretty evenly among the processors.
Once the tasks have been created and the threads have been started, the main thread has to collect and process all the results from the tasks. As each task completes, it places its result into the result queue. A result can be removed from the queue by calling resultQueue.take(), which will block if the queue is empty until a result becomes available. So the for loop that processes results won't be able to complete until all the tasks have been completed and all the threads have terminated:
int maxDivisorCount = 0; // Overall maximum found by any task. int intWithMaxDivisorCount = 0; // Which integer gave that maximum? for (int i = 0; i < numberOfTasks; i++) { try { Result result = resultQueue.take(); if (result.maxDivisorFromTask > maxDivisorCount) { // new maximum. maxDivisorCount = result.maxDivisorFromTask; intWithMaxDivisorCount = result.intWithMaxFromTask; } } catch (InterruptedException e) { // Won't happen in this program! } }
Once the for loop ends, the final answers, taking into account the results from every thread, are in the variables maxDivisorCount and intWithMaxDivisorCount.
My solution uses a ConcurrentLinkedQueue for the task queue. This is a little touchy, since we have to be careful not to start the threads until the tasks are in the queue. This is because the threads are programmed to terminate as soon as they see an empty queue:
private static class CountDivisorsThread extends Thread { public void run() { while (true) { Task task = taskQueue.poll(); if (task == null) break; // queue is empty; terminate task.compute(); } } }
If the queue is empty when the thread starts, it will terminate immediately. Furthermore, terminating the threads after the job is done is fine in this program, but often when we use a thread pool, we want to keep the threads around to work on more than one job.
One solution is to use a LinkedBlockingQueue for the task queue, instead of a ConcurrentLinkedQueue. When a blocking queue is faced with an empty queue, it waits for an item to become available in the queue. We can then start the queue whenever we want, and the threads will wait, if necessary, for tasks to be added to the queue. If we had more jobs for the thread pool, the threads would just wait around between jobs and would become active again as soon as tasks from a new job are enqueued. Here is a version of the thread class that will work with a blocking queue of tasks:
private static class CountDivisorsThread extends Thread { CountDivisorsThread() { setDaemon(true); } public void run() { while (true) { try { Task task = blockingTaskQueue.take(); task.compute(); } catch (InterruptedException e) { } } } }
Note that I've made the thread into a daemon thread. This will allow the Java Virtual Machine to exit, even though the thread still exists, as long as all the non-daemon threads have exited. For this exercise, if the threads were not daemon threads, the program would not actually end after completing the job, since the non-daemon thread-pool threads would still be around. (However, you could still make the program end by calling System.exit().)
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; /** * This program finds the number in the range 1 to some maximum that has the * largest number of divisors. It prints that number and the number of divisors * that it has. Note that there might be several numbers that have the maximum * number of divisors. Only one of them is output. * * The program's work is divided into a large number of tasks that are executed * by a thread pool. Each task consists of finding the maximum number of * divisors among a sequence of 1000 integers. */ public class CountDivisorsUsingThreadPool { /** * The upper limit of the range of integers that is to be tested. * (This must be a fairly large multiple of 1000 for the thread * pool load-balancing strategy to be effective.) */ private final static int MAX = 100000; /** * A queue to hold the tasks. Tasks are represented as objects * of type Task, a nested class that is defined below. Note * that queue operations must be synchronized because the * queue is used by multiple threads. A ConcurrentLinkedQueue * handles synchronization internally. */ private static ConcurrentLinkedQueue<Task> taskQueue; /** * A queue to hold the results from the tasks. Results * are defined by the nested class, Result, which is defined * below. This is a blocking queue since the thread * that takes results from the queue should block when * the queue is empty until a result becomes available. * (Note: The Task class could have been used to represent * results as well; I am using a separate Result class * for clarity in this example.) */ private static LinkedBlockingQueue<Result> resultQueue; /** * A class to represent the task of finding the number in * a given range of integers that has the largest number of * divisors. The range is specified in the constructor. * The task is executed when the compute() method is * called. At the end of the compute() method, a Result * object is created to represent the results from this * task, and the result object is added to resultQueue. */ private static class Task { int min, max; // Start and end of the range of integers for this task. Task(int min, int max) { this.min = min; this.max = max; } public void compute() { int maxDivisors = 0; int whichInt = 0; for (int i = min; i < max; i++) { int divisors = countDivisors(i); if (divisors > maxDivisors) { maxDivisors = divisors; whichInt = i; } } resultQueue.add( new Result(maxDivisors, whichInt) ); } } /** * A class to represent the result from one task. The * result consists of the maximum number of divisors in * the range of integers assigned to that task, and the * integer in the range that gave the maximum number of * divisors. */ private static class Result { int maxDivisorFromTask; // Maximum number of divisors found. int intWithMaxFromTask; // Which integer gave that maximum number. Result(int maxDivisors, int whichInt) { maxDivisorFromTask = maxDivisors; intWithMaxFromTask = whichInt; } } /** * A thread belonging to this class counts the number of divisors for all * the integers in an assigned range of integers. The range is specified * in the constructor. The thread finds the integer in the range that * has the largest number of divisors, and a number that has that many * divisors. At the end of its computation, the thread reports its answer * by calling the report() method. */ private static class CountDivisorsThread extends Thread { public void run() { while (true) { Task task = taskQueue.poll(); if (task == null) break; task.compute(); } } } /** * Finds the number in the range 1 to MAX that has the largest number of * divisors, dividing the work into tasks that will be executed by threads * in a thread pool. This method creates the task and result queues. * It adds all the tasks to the task queue. Then it creates the threads * for the thread pool and starts them. (Note that this must be done * AFTER all the tasks are in the task queue, since the threads exit * when they see an empty queue.) Finally, it reads results from * the result queue and combines them to get the overall answer. * @param numberOfThreads the number of threads in the thread pool. */ private static void countDivisorsWithThreads(int numberOfThreads) { System.out.println("\nCounting divisors using " + numberOfThreads + " threads..."); /* Create the queues and the thread pool, but don't start * the threads yet. */ long startTime = System.currentTimeMillis(); resultQueue = new LinkedBlockingQueue<Result>(); taskQueue = new ConcurrentLinkedQueue<Task>(); CountDivisorsThread[] workers = new CountDivisorsThread[numberOfThreads]; for (int i = 0; i < workers.length; i++) workers[i] = new CountDivisorsThread(); /* Create the tasks and add them to the task queue. Each * task consists of a range of 1000 integers, so the number of * tasks is (MAX+999)/1000. (The "+999" gives the correct number * of tasks when MAX is not an exact multiple of 1000. The last * task in that case will consist of the last (MAX%1000)) ints. */ int numberOfTasks = (MAX + 999) / 1000; for (int i = 0; i < numberOfTasks; i++) { int start = i*1000 + 1; int end = (i+1)*1000; if (end > MAX) end = MAX; //System.out.println(start + " " + end); // for testing taskQueue.add( new Task(start,end) ); } /* Now that the tasks are in the task queue, start the threads. */ for (int i = 0; i < numberOfThreads; i++) workers[i].start(); /* The threads will execute the tasks and results will be placed * into the result queue. This method now goes on to read all * the results from the result queue and combine them to give * the overall answer. */ int maxDivisorCount = 0; // Over maximum found by any task. int intWithMaxDivisorCount = 0; // Which integer gave that maximum? for (int i = 0; i < numberOfTasks; i++) { try { Result result = resultQueue.take(); if (result.maxDivisorFromTask > maxDivisorCount) { // new maximum. maxDivisorCount = result.maxDivisorFromTask; intWithMaxDivisorCount = result.intWithMaxFromTask; } } catch (InterruptedException e) { } } /* Report the results. */ long elapsedTime = System.currentTimeMillis() - startTime; System.out.println("\nThe largest number of divisors " + "for numbers between 1 and " + MAX + " is " + maxDivisorCount); System.out.println("An integer with that many divisors is " + intWithMaxDivisorCount); System.out.println("Total elapsed time: " + (elapsedTime/1000.0) + " seconds.\n"); } // end countDivisorsWithThreads() /** * The main() routine just gets the number of threads from the user and * calls countDivisorsWithThreads() to do the actual work. */ public static void main(String[] args) { int numberOfThreads = 0; while (numberOfThreads < 1 || numberOfThreads > 10) { System.out.print("How many threads do you want to use (1 to 10) ? "); numberOfThreads = TextIO.getlnInt(); if (numberOfThreads < 1 || numberOfThreads > 10) System.out.println("Please enter a number from 1 to 10 !"); } countDivisorsWithThreads(numberOfThreads); } /** * Finds the number of divisors of the integer N. Note that this method does * the counting in a stupid way, since it tests every integer in the range * 1 to N to see whether it evenly divides N. */ private static int countDivisors(int N) { int count = 0; for (int i = 1; i <= N ; i++) { if ( N % i == 0 ) count ++; } return count; } } // end CountDivisorsUsingThreadPool