Tag Archives: java multithreading

The number of control threads and concurrency number of concurrent executor of Java starting gun

introduction

the first time when studying the singleton pattern, useful to the multithreading concurrency test thread-safe singleton pattern. However, I was pressed for time at that time and did not record it, so I made a special record today.

1, look at the code

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


/**
* @author :Jarvisy
* @date :Created in 2020/9/16 1:20
* @description :
*/
public class ConcurrentExecutor {
    /**
     * @param runHandler
     * @param executeCount    发起请求总数
     * @param concurrentCount 同时并发执行的线程数
     * @throws Exception
     */
    public static void execute(final RunHandler runHandler, int executeCount, int concurrentCount) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //控制信号量,此处用于控制并发的线程数
        final Semaphore semaphore = new Semaphore(concurrentCount);
        //闭锁,可实现计数量递减
        final CountDownLatch countDownLatch = new CountDownLatch(executeCount);
        for (int i = 0; i < executeCount; i++) {
            executorService.execute(new Runnable() {
                public void run() {
                    try {
                        //执行此方法用于获取执行许可,当总计未释放的许可数不超过executeCount时,
                        //则允许同性,否则线程阻塞等待,知道获取到许可
                        semaphore.acquire();
                        runHandler.handler();//回调函数
                        //释放许可
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();//线程阻塞,知道闭锁值为0时,阻塞才释放,继续往下执行
        executorService.shutdown();
    }


    public interface RunHandler {
        void handler();
    }
}

2, Executors

Java four thread pool can be created by Executors:

  1. newCachedThreadPool creates a cacheable thread pool. If the length of the thread pool exceeds the processing requirement, the free thread can be reclaimed flexibly. If no thread can be retrieved, a new thread can be created.
  2. newFixedThreadPool creates a fixed-length thread pool that controls the maximum number of concurrent threads. The newScheduledThreadPool creates a fixed-length thread pool that supports timed and periodic task execution while waiting in the queue for
  3. .
  4. newSingleThreadExecutor creates a single-threaded thread pool that executes tasks with only a unique worker thread, ensuring that all tasks are executed in the specified order (FIFO, LIFO, priority).

The first type: newCachedThreadPool
thread pool is infinite. When the second task is executed and the first task has been completed, the thread executing the first task is reused instead of creating a new thread each time.

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            try {
                Thread.sleep(index * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    System.out.println(index);
                }
            });
        }

second: newFixedThreadPool
because the thread pool size is 3, each task output index after sleep 2 seconds, so every two seconds to print 3 Numbers.
the size of the fixed-length thread pool is best set based on system resources. Such as the Runtime. GetRuntime (). AvailableProcessors ()

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);  
  for (int i = 0; i < 10; i++) {  
   final int index = i;  
   fixedThreadPool.execute(new Runnable() {  
    public void run() {  
     try {  
      System.out.println(index);  
      Thread.sleep(2000);  
     } catch (InterruptedException e) {  
      e.printStackTrace();  
     }  
    }  
   });

NewScheduledThreadPool

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);  
  scheduledThreadPool.scheduleAtFixedRate(new Runnable() {  
   public void run() {  
    System.out.println("delay 1 seconds, and excute every 3 seconds");  
   }  
  }, 1, 3, TimeUnit.SECONDS);

Fourth: newSingleThreadExecutor

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();  
  for (int i = 0; i < 10; i++) {  
   final int index = i;  
   singleThreadExecutor.execute(new Runnable() {  
    public void run() {  
     try {  
      System.out.println(index);  
      Thread.sleep(2000);  
     } catch (InterruptedException e) {  
      e.printStackTrace();  
     }  
    }  
   });

3, the ExecutorService

ExecutorService methods:

1, execute(Runnable) this method accepts an instance of Runnable and executes asynchronously. The problem with this approach is that there is no way to know the execution results of Task. If we want to get the execution result of task, we can pass in an instance of Callable.

2, submit(Runnable) submit(Runnable) and execute(Runnable) are different in that the former can return a Future object. By returning the Future object, we can check whether the submitted task has been completed. If the task completes, the future.get() method returns null. Note that the future.get() method causes a block.

3, submit(Callable) submit(Callable), similar to submit(Runnable), also returns a Future object, but otherwise submit(Callable) receives an implementation of a Callable. The call() method in the Callable interface has a return value that can return the execution result of the task, while the run() method in the Runnable interface is void. No return value. If the task completes, the future.get() method returns the execution result of the Callable task. Note that the future.get() method causes a block.

4, invokeAny (…). this method receives a collection of Callable. Execution of this method does not return the Future, but does return the result of one of the Callable tasks. This method does not guarantee that the result of which task is returned, but one of them anyway.

5 and invokeAll (…). invokeAll (…). With invokeAny (…). Similarly, a collection of Callable objects is received, but the former will return a List of Future objects corresponding to each Callable task after execution.

Shutdown of>ExecutorService:
1, shutdown() : stops receiving new tasks, and the original tasks continue

  1. stop receiving the new submit task;
  2. committed tasks (including those running and waiting in the queue), will continue to complete;
  3. wait until the second step is completed, then really stop;

2, shutdownNow() : stop receiving new tasks, the original task stops executing

  1. stops receiving the new submit task as shutdown();
  2. ignores tasks waiting in the queue;
  3. attempts to interrupt the task being performed;
  4. returns a list of unexecuted tasks;

3, awaitTermination(long timeOut, TimeUnit) : timeOut and TimeUnit are timeOut and unit
the current thread is blocking until:

  1. and other submitted tasks (including running and waiting in the queue) executed;
  2. or timeout (timeout and TimeUnit);
  3. or if the thread is interrupted, throw InterruptedException;

then monitors whether the ExecutorService has closed, returning either true (after the shutdown request all tasks have been completed) or false (timeout)

The difference between

4, shutdown() and shutdownNow()
shutdown() just closed the submission channel, using submit() was invalid; And the inside runs just the way it is supposed to, and then stops.
shutdownNow() instantly stops the thread pool, stopping both running and waiting tasks.

5, shutdown(), awaitTermination(),
shutdown(), no new task can be submitted. But after awaitTermination(), the submission can continue.
awaitTermination() is blocked and returns whether the thread pool has stopped (true/false); Shutdown () does not block.

1. Elegant shutdown, using shutdown()
; 2. Want to shutdown immediately, and get the list of unperformed tasks; 3. shutdown() > awaitTermination()

4, Semaphore

Semaphore is a helper class for thread synchronization that maintains the number of threads currently accessing itself and provides a synchronization mechanism. Semaphore allows you to control the number of threads accessing a resource at the same time, for example, to implement the number of concurrent accesses allowed for a file.

Semaphore initialization must provide the number of concurrent threads because Semaphore does not provide a function to update the number of concurrent threads.

1. The constructors of the class Semaphore are permits, representing the maximum number of permits for threads to execute code between acquire() and release() at the same time. Acquire () acquire() function is to consume 1 license every time this method is called. The function of acquire(n) is to consume n licenses every time this method is called. The function of release() is to dynamically add a license to the method every time it is called.
5, method release(n)‘s function is to dynamically add n licenses to the method every time it is called.
method acquirenterruptibly () means that threads waiting to enter acquire() method are not allowed to be interrupted.
7, the method available() returns the number of permits currently available in the Semaphore object.
8, method exhaustive () retrieves and returns the maximum number of permits, And will be available for permission to reset to 0
9, methods, getQueueLength () is the function of licensing the waiting thread number 10,
method hasQueueThreads () is used to determine any threads are waiting for the permission
11, fair and not fair semaphore:
sometimes to get permission to order associated with the sequence of thread starts, this signal will be divided into fair and not fair. The so-called fair semaphore is the order in which the lock is acquired depending on the order in which the thread is started, but it does not mean that 100% of the semaphore is acquired, only in probability, and the non-fair semaphore is independent.
e.g.

Semaphore semaphore = new Semaphore(1false);

False: means non-fair semaphore, i.e., the order in which threads start is independent of the order in which semaphore. Acquire () is called, i.e., the program started first does not mean that permission is obtained first.
True: fair semaphore, i.e., the order in which threads start is related to the order in which semaphore. Acquire () is called, i.e., the first started thread gets permission first.

12, tryAcquire() is used to try to obtain 1 license. Returns false if it cannot be retrieved, usually in conjunction with an if statement, which is non-blocking. The non-blocking feature prevents the synchronization from being in a state of continuous waiting.
13, method tryAcquire(n) is to try to obtain n licenses, if not, return false
14, method tryAcquire(long timeout, TimeUnit unit) is to try to obtain 1 license within a specified time. The function of tryAcquire(int acquisition, long timeout, TimeUnit unit) is to attempt to obtain n permits within a specified time. Otherwise, false

can be returned

5, CountDownLatch

CountDownLatch is a class that causes one thread to wait until each of the other threads has completed its own execution.
is implemented by a counter whose initial value is the number of threads. After each thread has finished executing, the value of the counter is -1. When the value of the counter is 0, it means that all threads have finished executing, and then the thread waiting on the lock can resume its work.

CountDownLatch has only one constructor, and you need to pass in an int, which is typically your number of threads

The three methods in the

CountDownLatch class are the most important:

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };   
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
//将count值减1
public void countDown() { };