Wednesday, November 7, 2012

Thread Pool in Java

Here is a small code snippet which shows how we can implement Thread Pools in Java

What does the application do ?

It creates multiple threads (one can define the pool size) and each thread is responsible for calculating the size of a specified folder. If a folder has multiple folders/files and sub-folders inside them, it recursively goes into each one of them and uses the existing thread pool to re-use the threads to calculate the folder size. Thread pools limit the number of threads that are created - too many threads will start thrashing the CPU and it will hamper the performance. The code is somewhat working :-)

File: DeniedExecHandlerImpl.java

package com.giri.interview;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

// in case the queue is full - the task is rejected and this function is called when a particular task is rejected

public class DeniedExecHandlerImpl implements RejectedExecutionHandler {

 @Override
 public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
  System.out.println("\nDENIED: " + arg0.toString() + " thread has not been accepted for execution !");
 }
}

File: MainClass.java

package com.giri.interview;

import java.io.File;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MainClass {
 
 public static long CALCULATED_FOLDER_SIZE = 0;
 
 private static int q_size = 30000;      //------------------->> this has to be a large number
 private static int max_pool_size = 10;  //------------------->> Number of parallel threads
 
 private static BlockingQueue bq_runnable = new ArrayBlockingQueue(q_size);
 private static RejectedExecutionHandler deny_handler = new DeniedExecHandlerImpl();
 
 private static ThreadPoolExecutor main_executor = null;
 private static Thread main_monitor = null;
 
 
 public static MainVariables mainVars = new MainVariables();
 
 public static void main(String[] args) {
  
  main_executor = new ThreadPoolExecutor( max_pool_size, 
            max_pool_size, 
            10000, 
            TimeUnit.SECONDS, 
            bq_runnable, 
            deny_handler);
  
  main_executor.allowCoreThreadTimeOut(false);
  
  main_monitor = new Thread(new MainMonitorThread(main_executor));
  main_monitor.setDaemon(true);
  main_monitor.start();
  
    
  LinkedList allDirs = new LinkedList();
  String dirPath = "/Users/apple";
  allDirs = Utils.getAllFoldersAndSubFolders(new File(dirPath));
  System.out.println("\n Total number of directories in the directory path : " + dirPath + " = "+ allDirs.size());
  
  int t_count = 0;
  
  for(File d : allDirs) {
   String countStr = Integer.toString(t_count++);
   main_executor.execute(new ThreadedFolderSizeCalculator(countStr, d));
  }
 }
}

File: MainMonitorThread.java

package com.giri.interview;

import java.util.concurrent.ThreadPoolExecutor;


// this is the main moniting thread - which keeps dumping some important info on stdout

public class MainMonitorThread implements Runnable {
 
 private ThreadPoolExecutor exec;
 
 public MainMonitorThread(ThreadPoolExecutor e) {
  exec = e;
 }

 @Override
 public void run() {
  try {
   while(true) {
    
    System.out.println("\n @ Monitor @ >> "
      + "Pool-Size=["            + exec.getPoolSize()           + "] "
      + "Core-Pool-Size=["       + exec.getCorePoolSize()       + "] "
      + "Active-Count=["         + exec.getActiveCount()        + "] "
      + "Completed-Task-Count=[" + exec.getCompletedTaskCount() + "] "
      + "Task-Count=["           + exec.getTaskCount()          + "] "
      + "Shut-Down=["            + exec.isShutdown()            + "] "
      + "Is-Terminated=["        + exec.isTerminated()          + "] "
    );
    Thread.sleep(1000);
   } // while
  } catch(Exception e) {
   System.out.println("Error in MainMonitorThread !");
   e.printStackTrace();
  }

 }

}

File: MainVariables.java

package com.giri.interview;

public class MainVariables {
 
 // this class holds certain important variables
 
 // total file size (of all files in all directories)
 private long TOTAL_SIZE = 0;
 
 private long NUM_FILES = 0;
 private long NUM_FOLDERS = 0;
 
 public synchronized long getTOTAL_SIZE() {
  return TOTAL_SIZE;
 }
 public synchronized void setTOTAL_SIZE(long tOTAL_SIZE) {
  TOTAL_SIZE = tOTAL_SIZE;
 }
 public synchronized long getNUM_FILES() {
  return NUM_FILES;
 }
 
 public synchronized void setNUM_FILES(long nUM_FILES) {
  NUM_FILES = nUM_FILES;
 }
 
 public synchronized long getNUM_FOLDERS() {
  return NUM_FOLDERS;
 }
 
 public synchronized void setNUM_FOLDERS(long nUM_FOLDERS) {
  NUM_FOLDERS = nUM_FOLDERS;
 }
 
 public synchronized void increaseNUM_FOLDERS_byOne() {
  NUM_FOLDERS = NUM_FOLDERS + 1;
 }
 
 public synchronized void increaseNUM_FILES_byOne() {
  NUM_FILES = NUM_FILES +1;
 }
 
 public synchronized void increaseTOTAL_SIZE(long val) {
  TOTAL_SIZE = TOTAL_SIZE + val;
 }
 
   
}

File: ThreadedFolderSizeCalculator

package com.giri.interview;

import java.io.File;

public class ThreadedFolderSizeCalculator implements Runnable{
 
 //our thread name
 private String name;
 
 // the file object
 private File file;
 
 // constructor
 public ThreadedFolderSizeCalculator(String n) {
  name = n;
 }
 
 // constructor
 public ThreadedFolderSizeCalculator(String n, File f) {
  file = f;
  name = n;
 }
 
 // actual function that is executed - which is - the thread calculates the specified directory size
 @Override
 public void run() {
  System.out.println("\n [Thread-"+name+"] is running !");
  
  Utils.getFolderSize(file);
  
  System.out.println("\n [Thread-"+name+"] is done !");
 }
 
 @Override
 public String toString() {
  return name;
 }

}

File: Utils

package com.giri.interview;

import java.awt.List;
import java.io.File;
import java.io.FileFilter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;


public class Utils {
 
 // this has the entire folder/directory list
 public static LinkedList utils_linkedlist = new LinkedList();
 
 // this fetches the folder size - for a given directory
 // if a directory called "keshav" (Directory points to "/home/keshav") and 
 // inside that if there are 3 files and 2 directories
 // 1. foo.txt - 100 bytes
 // 2. hello.doc - 200 bytes
 // 3. foo.exe - 300 bytes
 // 4. Interviews/      (directory - which may have folders or files inside them)
 // 5. SourceCode/      (directory - which may have files or folders inside them)
 //   Calling the below function on the directory "keshav" should return 100+200+300 = 600 bytes
 public static long getFolderSize(File folder) {
  long totalSize = 0;
  
  System.out.println("\nFolder="+folder.getName());
  
  if(!folder.isDirectory()) {
   return 0;
  }
  
  File[] filesInFolder = folder.listFiles();
  
  for(int i = 0; i< filesInFolder.length; i++ ) {
   
   if(filesInFolder[i].isFile()) {
    MainClass.mainVars.increaseNUM_FILES_byOne();
    totalSize = totalSize + filesInFolder[i].length();
    MainClass.mainVars.increaseTOTAL_SIZE(filesInFolder[i].length());
   }
  }
  System.out.println("\nSize of the current folder ("+folder.getName()+")="+totalSize);
  System.out.println("\nCurrent Total File Size="+MainClass.mainVars.getTOTAL_SIZE());
  System.out.println("\n-------------------------");
  return totalSize;
 }
 
 // this recursive function - populates our list - with all directories and sub-directories
 public static LinkedList getAllFoldersAndSubFolders(File dirPath) {
  
  File[] files = dirPath.listFiles();
  for(File f : files ) {
   if(f.isDirectory()) {
    utils_linkedlist.add(f);
    getAllFoldersAndSubFolders(f);
   }
  }
  return utils_linkedlist;
 }
}

No comments :

Post a Comment