加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

Java 8 Concurrency Tutorial--转

发布时间:2020-12-14 06:22:52 所属栏目:Java 来源:网络整理
导读:h1 class="page-title"Threads and Executors Welcome to the first part of my Java 8 Concurrency tutorial. This guide teaches you??in Java 8 with easily understood code examples. It's the first part out of a series of tutorials covering the J

<h1 class="page-title">Threads and Executors

Welcome to the first part of my Java 8 Concurrency tutorial. This guide teaches you??in Java 8 with easily understood code examples. It's the first part out of a series of tutorials covering the Java Concurrency API. In the next 15 min you learn how to execute code in parallel via threads,tasks and executor services.

  • Part 1: Threads and Executors
  • Part 2:?
  • Part 3:?

The??was first introduced with the release of Java 5 and then progressively enhanced with every new Java release. The majority of concepts shown in this article also work in older versions of Java. However my code samples focus on Java 8 and make heavy use of lambda expressions and other new features. If you're not yet familiar with lambdas I recommend reading my??first.

Threads and Runnables

All modern operating systems support concurrency both via??and?. Processes are instances of programs which typically run independent to each other,e.g. if you start a java program the operating system spawns a new process which runs in parallel to other programs. Inside those processes we can utilize threads to execute code concurrently,so we can make the most out of the available cores of the CPU.

Java supports??since JDK 1.0. Before starting a new thread you have to specify the code to be executed by this thread,often called the?task. This is done by implementing?Runnable?- a functional interface defining a single void no-args method?run()?as demonstrated in the following example:

 task<span class="token punctuation">.<span class="token function">run<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;

Thread thread <span class="token operator">= <span class="token keyword">new <span class="token class-name">Thread<span class="token punctuation">(task<span class="token punctuation">)<span class="token punctuation">;
thread<span class="token punctuation">.<span class="token function">start<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;

System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Done!"<span class="token punctuation">)<span class="token punctuation">;

Since?Runnable?is a functional interface we can utilize Java 8 lambda expressions to print the current threads name to the console. First we execute the runnable directly on the main thread before starting a new thread.

The result on the console might look like this:


Or that:


Due to concurrent execution we cannot predict if the runnable will be invoked before or after printing 'done'. The order is non-deterministic,thus making concurrent programming a complex task in larger applications.

Threads can be put to sleep for a certain duration. This is quite handy to simulate long running tasks in the subsequent code samples of this article:

 Thread thread <span class="token operator">= <span class="token keyword">new <span class="token class-name">Thread<span class="token punctuation">(runnable<span class="token punctuation">)<span class="token punctuation">;
thread<span class="token punctuation">.<span class="token function">start<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;

When you run the above code you'll notice the one second delay between the first and the second print statement.?TimeUnit?is a useful enum for working with units of time. Alternatively you can achieve the same by calling?Thread.sleep(1000).

Working with the?Thread?class can be very tedious and error-prone. Due to that reason the?Concurrency API?has been introduced back in 2004 with the release of Java 5. The API is located in package?java.util.concurrent?and contains many useful classes for handling concurrent programming. Since that time the Concurrency API has been enhanced with every new Java release and even Java 8 provides new classes and methods for dealing with concurrency.

Now let's take a deeper look at one of the most important parts of the Concurrency API - the executor services.

Executors

The Concurrency API introduces the concept of an?ExecutorService?as a higher level replacement for working with threads directly. Executors are capable of running asynchronous tasks and typically manage a pool of threads,so we don't have to create new threads manually. All threads of the internal pool will be reused under the hood for revenant tasks,so we can run as many concurrent tasks as we want throughout the life-cycle of our application with a single executor service.

This is how the first thread-example looks like using executors:

 <span class="token comment">// => Hello pool-1-thread-1

The class?Executors?provides convenient factory methods for creating different kinds of executor services. In this sample we use an executor with a thread pool of size one.

The result looks similar to the above sample but when running the code you'll notice an important difference: the java process never stops! Executors have to be stopped explicitly - otherwise they keep listening for new tasks.

An?ExecutorService?provides two methods for that purpose:?shutdown()?waits for currently running tasks to finish while?shutdownNow()?interrupts all running tasks and shut the executor down immediately.

This is the preferred way how I typically shutdown executors:


The executor shuts down softly by waiting a certain amount of time for termination of currently running tasks. After a maximum of five seconds the executor finally shuts down by interrupting all running tasks.

Callables and Futures

In addition to?Runnable?executors support another kind of task named?Callable. Callables are functional interfaces just like runnables but instead of being?void?they return a value.

This lambda expression defines a callable returning an integer after sleeping for one second:

 task  

Callables can be submitted to executor services just like runnables. But what about the callables result? Since?submit()?doesn't wait until the task completes,the executor service cannot return the result of the callable directly. Instead the executor returns a special result of type?Future?which can be used to retrieve the actual result at a later point in time.

 future System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"future done? " <span class="token operator">+ future<span class="token punctuation">.<span class="token function">isDone<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;

Integer result <span class="token operator">= future<span class="token punctuation">.<span class="token function">get<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;

System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"future done? " <span class="token operator">+ future<span class="token punctuation">.<span class="token function">isDone<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">print<span class="token punctuation">(<span class="token string">"result: " <span class="token operator">+ result<span class="token punctuation">)<span class="token punctuation">;

After submitting the callable to the executor we first check if the future has already been finished execution via?isDone(). I'm pretty sure this isn't the case since the above callable sleeps for one second before returning the integer.

Calling the method?get()?blocks the current thread and waits until the callable completes before returning the actual result?123. Now the future is finally done and we see the following result on the console:


Futures are tightly coupled to the underlying executor service. Keep in mind that every non-terminated future will throw exceptions if you shutdown the executor:


You might have noticed that the creation of the executor slightly differs from the previous example. We use?newFixedThreadPool(1)?to create an executor service backed by a thread-pool of size one. This is equivalent to?newSingleThreadExecutor()?but we could later increase the pool size by simply passing a value larger than one.

Timeouts

Any call to?future.get()?will block and wait until the underlying callable has been terminated. In the worst case a callable runs forever - thus making your application unresponsive. You can simply counteract those scenarios by passing a timeout:

Future<span class="token operator"><Integer<span class="token operator">> future <span class="token operator">= executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
<span class="token keyword">try <span class="token punctuation">{
TimeUnit<span class="token punctuation">.SECONDS<span class="token punctuation">.<span class="token function">sleep<span class="token punctuation">(<span class="token number">2<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">return <span class="token number">123<span class="token punctuation">;
<span class="token punctuation">}
<span class="token keyword">catch <span class="token punctuation">(<span class="token class-name">InterruptedException e<span class="token punctuation">) <span class="token punctuation">{
<span class="token keyword">throw <span class="token keyword">new <span class="token class-name">IllegalStateException<span class="token punctuation">(<span class="token string">"task interrupted"<span class="token punctuation">,e<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

future<span class="token punctuation">.<span class="token function">get<span class="token punctuation">(<span class="token number">1<span class="token punctuation">,TimeUnit<span class="token punctuation">.SECONDS<span class="token punctuation">)<span class="token punctuation">;

Executing the above code results in a?TimeoutException:


You might already have guessed why this exception is thrown: We specified a maximum wait time of one second but the callable actually needs two seconds before returning the result.

InvokeAll

Executors support batch submitting of multiple callables at once via?invokeAll(). This method accepts a collection of callables and returns a list of futures.

List<span class="token operator"><Callable<span class="token operator"><String<span class="token operator">>> callables <span class="token operator">= Arrays<span class="token punctuation">.<span class="token function">asList<span class="token punctuation">(
<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token string">"task1"<span class="token punctuation">,<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token string">"task2"<span class="token punctuation">,<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token string">"task3"<span class="token punctuation">)<span class="token punctuation">;

executor<span class="token punctuation">.<span class="token function">invokeAll<span class="token punctuation">(callables<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">stream<span class="token punctuation">(<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">map<span class="token punctuation">(future <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
<span class="token keyword">try <span class="token punctuation">{
<span class="token keyword">return future<span class="token punctuation">.<span class="token function">get<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token keyword">catch <span class="token punctuation">(<span class="token class-name">Exception e<span class="token punctuation">) <span class="token punctuation">{
<span class="token keyword">throw <span class="token keyword">new <span class="token class-name">IllegalStateException<span class="token punctuation">(e<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">forEach<span class="token punctuation">(System<span class="token punctuation">.out<span class="token operator">:<span class="token operator">:println<span class="token punctuation">)<span class="token punctuation">;

In this example we utilize Java 8 functional streams in order to process all futures returned by the invocation of?invokeAll. We first map each future to its return value and then print each value to the console. If you're not yet familiar with streams read my?.

InvokeAny

Another way of batch-submitting callables is the method?invokeAny()?which works slightly different to?invokeAll(). Instead of returning future objects this method blocks until the first callable terminates and returns the result of that callable.

In order to test this behavior we use this helper method to simulate callables with different durations. The method returns a callable that sleeps for a certain amount of time until returning the given result:

  

We use this method to create a bunch of callables with different durations from one to three seconds. Submitting those callables to an executor via?invokeAny()?returns the string result of the fastest callable - in that case task2:

List<span class="token operator"><Callable<span class="token operator"><String<span class="token operator">>> callables <span class="token operator">= Arrays<span class="token punctuation">.<span class="token function">asList<span class="token punctuation">(
<span class="token function">callable<span class="token punctuation">(<span class="token string">"task1"<span class="token punctuation">,<span class="token number">2<span class="token punctuation">)<span class="token punctuation">,<span class="token function">callable<span class="token punctuation">(<span class="token string">"task2"<span class="token punctuation">,<span class="token number">1<span class="token punctuation">)<span class="token punctuation">,<span class="token function">callable<span class="token punctuation">(<span class="token string">"task3"<span class="token punctuation">,<span class="token number">3<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;

String result <span class="token operator">= executor<span class="token punctuation">.<span class="token function">invokeAny<span class="token punctuation">(callables<span class="token punctuation">)<span class="token punctuation">;
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(result<span class="token punctuation">)<span class="token punctuation">;

<span class="token comment">// => task2

The above example uses yet another type of executor created via?newWorkStealingPool(). This factory method is part of Java 8 and returns an executor of type?ForkJoinPool?which works slightly different than normal executors. Instead of using a fixed size thread-pool??are created for a given parallelism size which per default is the number of available cores of the hosts CPU.

ForkJoinPools exist since Java 7 and will be covered in detail in a later tutorial of this series. Let's finish this tutorial by taking a deeper look at scheduled executors.

Scheduled Executors

We've already learned how to submit and run tasks once on an executor. In order to periodically run common tasks multiple times,we can utilize scheduled thread pools.

A?ScheduledExecutorService?is capable of scheduling tasks to run either periodically or once after a certain amount of time has elapsed.

This code sample schedules a task to run after an initial delay of three seconds has passed:

Runnable task <span class="token operator">= <span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Scheduling: " <span class="token operator">+ System<span class="token punctuation">.<span class="token function">nanoTime<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
ScheduledFuture<span class="token operator"><<span class="token operator">?<span class="token operator">> future <span class="token operator">= executor<span class="token punctuation">.<span class="token function">schedule<span class="token punctuation">(task<span class="token punctuation">,<span class="token number">3<span class="token punctuation">,TimeUnit<span class="token punctuation">.SECONDS<span class="token punctuation">)<span class="token punctuation">;

TimeUnit<span class="token punctuation">.MILLISECONDS<span class="token punctuation">.<span class="token function">sleep<span class="token punctuation">(<span class="token number">1337<span class="token punctuation">)<span class="token punctuation">;

<span class="token keyword">long remainingDelay <span class="token operator">= future<span class="token punctuation">.<span class="token function">getDelay<span class="token punctuation">(TimeUnit<span class="token punctuation">.MILLISECONDS<span class="token punctuation">)<span class="token punctuation">;
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">printf<span class="token punctuation">(<span class="token string">"Remaining Delay: %sms"<span class="token punctuation">,remainingDelay<span class="token punctuation">)<span class="token punctuation">;

Scheduling a task produces a specialized future of type?ScheduledFuture?which - in addition to?Future?- provides the method?getDelay()?to retrieve the remaining delay. After this delay has elapsed the task will be executed concurrently.

In order to schedule tasks to be executed periodically,executors provide the two methods?scheduleAtFixedRate()?and?scheduleWithFixedDelay(). The first method is capable of executing tasks with a fixed time rate,e.g. once every second as demonstrated in this example:

Runnable task <span class="token operator">= <span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Scheduling: " <span class="token operator">+ System<span class="token punctuation">.<span class="token function">nanoTime<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;

<span class="token keyword">int initialDelay <span class="token operator">= <span class="token number">0<span class="token punctuation">;
<span class="token keyword">int period <span class="token operator">= <span class="token number">1<span class="token punctuation">;
executor<span class="token punctuation">.<span class="token function">scheduleAtFixedRate<span class="token punctuation">(task<span class="token punctuation">,initialDelay<span class="token punctuation">,period<span class="token punctuation">,TimeUnit<span class="token punctuation">.SECONDS<span class="token punctuation">)<span class="token punctuation">;

Additionally this method accepts an initial delay which describes the leading wait time before the task will be executed for the first time.

Please keep in mind that?scheduleAtFixedRate()?doesn't take into account the actual duration of the task. So if you specify a period of one second but the task needs 2 seconds to be executed then the thread pool will working to capacity very soon.

In that case you should consider using?scheduleWithFixedDelay()?instead. This method works just like the counterpart described above. The difference is that the wait time period applies between the end of a task and the start of the next task. For example:

Runnable task <span class="token operator">= <span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
<span class="token keyword">try <span class="token punctuation">{
TimeUnit<span class="token punctuation">.SECONDS<span class="token punctuation">.<span class="token function">sleep<span class="token punctuation">(<span class="token number">2<span class="token punctuation">)<span class="token punctuation">;
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Scheduling: " <span class="token operator">+ System<span class="token punctuation">.<span class="token function">nanoTime<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token keyword">catch <span class="token punctuation">(<span class="token class-name">InterruptedException e<span class="token punctuation">) <span class="token punctuation">{
System<span class="token punctuation">.err<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"task interrupted"<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">;

executor<span class="token punctuation">.<span class="token function">scheduleWithFixedDelay<span class="token punctuation">(task<span class="token punctuation">,<span class="token number">0<span class="token punctuation">,<span class="token number">1<span class="token punctuation">,TimeUnit<span class="token punctuation">.SECONDS<span class="token punctuation">)<span class="token punctuation">;

This example schedules a task with a fixed delay of one second between the end of an execution and the start of the next execution. The initial delay is zero and the tasks duration is two seconds. So we end up with an execution interval of 0s,3s,6s,9s and so on. As you can see?scheduleWithFixedDelay()?is handy if you cannot predict the duration of the scheduled tasks.

This was the first part out of a series of concurrency tutorials. I recommend practicing the shown code samples by your own. You find all code samples from this article on?,so feel free to fork the repo and?.

<h1 class="page-title">Synchronization and Locks

Welcome to the second part of my Java 8 Concurrency Tutorial out of a series of guides teaching multi-threaded programming in Java 8 with easily understood code examples. In the next 15 min you learn how to synchronize access to mutable shared variables via the synchronized keyword,locks and semaphores.

  • Part 1:?
  • Part 2: Synchronization and Locks
  • Part 3:?

The majority of concepts shown in this article also work in older versions of Java. However the code samples focus on Java 8 and make heavy use of lambda expressions and new concurrency features. If you're not yet familiar with lambdas I recommend reading my??first.

For simplicity the code samples of this tutorial make use of the two helper methods?sleep(seconds)?and?stop(executor)?as defined?.

Synchronized

In the??we've learned how to execute code in parallel via executor services. When writing such multi-threaded code you have to pay particular attention when accessing shared mutable variables concurrently from multiple threads. Let's just say we want to increment an integer which is accessible simultaneously from multiple threads.

We define a field?count?with a method?increment()?to increase count by one:

<span class="token keyword">void <span class="token function">increment<span class="token punctuation">(<span class="token punctuation">) <span class="token punctuation">{
count <span class="token operator">= count <span class="token operator">+ <span class="token number">1<span class="token punctuation">;
<span class="token punctuation">}

When calling this method concurrently from multiple threads we're in serious trouble:

IntStream<span class="token punctuation">.<span class="token function">range<span class="token punctuation">(<span class="token number">0<span class="token punctuation">,<span class="token number">10000<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">forEach<span class="token punctuation">(i <span class="token operator">-<span class="token operator">> executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token keyword">this<span class="token operator">:<span class="token operator">:increment<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(count<span class="token punctuation">)<span class="token punctuation">; <span class="token comment">// 9965

Instead of seeing a constant result count of 10000 the actual result varies with every execution of the above code. The reason is that we share a mutable variable upon different threads without synchronizing the access to this variable which results in a?.

Three steps have to be performed in order to increment the number: (i) read the current value,(ii) increase this value by one and (iii) write the new value to the variable. If two threads perform these steps in parallel it's possible that both threads perform step 1 simultaneously thus reading the same current value. This results in lost writes so the actual result is lower. In the above sample 35 increments got lost due to concurrent unsynchronized access to count but you may see different results when executing the code by yourself.

Luckily Java supports thread-synchronization since the early days via the?synchronizedkeyword. We can utilize?synchronized?to fix the above race conditions when incrementing the count:


When using?incrementSync()?concurrently we get the desired result count of 10000. No race conditions occur any longer and the result is stable with every execution of the code:

IntStream<span class="token punctuation">.<span class="token function">range<span class="token punctuation">(<span class="token number">0<span class="token punctuation">,<span class="token number">10000<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">forEach<span class="token punctuation">(i <span class="token operator">-<span class="token operator">> executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token keyword">this<span class="token operator">:<span class="token operator">:incrementSync<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(count<span class="token punctuation">)<span class="token punctuation">; <span class="token comment">// 10000

The?synchronized?keyword is also available as a block statement.


Internally Java uses a so called?monitor?also known as??in order to manage synchronization. This monitor is bound to an object,e.g. when using synchronized methods each method share the same monitor of the corresponding object.

All implicit monitors implement the?reentrant?characteristics. Reentrant means that locks are bound to the current thread. A thread can safely acquire the same lock multiple times without running into deadlocks (e.g. a synchronized method calls another synchronized method on the same object).

Locks

Instead of using implicit locking via the?synchronized?keyword the Concurrency API supports various explicit locks specified by the?Lock?interface. Locks support various methods for finer grained lock control thus are more expressive than implicit monitors.

Multiple lock implementations are available in the standard JDK which will be demonstrated in the following sections.

ReentrantLock

The class?ReentrantLock?is a mutual exclusion lock with the same basic behavior as the implicit monitors accessed via the?synchronized?keyword but with extended capabilities. As the name suggests this lock implements reentrant characteristics just as implicit monitors.

Let's see how the above sample looks like using?ReentrantLock:

<span class="token keyword">void <span class="token function">increment<span class="token punctuation">(<span class="token punctuation">) <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">lock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">try <span class="token punctuation">{
count<span class="token operator">++<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">finally <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">unlock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}

A lock is acquired via?lock()?and released via?unlock(). It's important to wrap your code into a?try/finally?block to ensure unlocking in case of exceptions. This method is thread-safe just like the synchronized counterpart. If another thread has already acquired the lock subsequent calls to?lock()?pause the current thread until the lock has been unlocked. Only one thread can hold the lock at any given time.

Locks support various methods for fine grained control as seen in the next sample:

executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">lock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">try <span class="token punctuation">{
<span class="token function">sleep<span class="token punctuation">(<span class="token number">1<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">finally <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">unlock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Locked: " <span class="token operator">+ lock<span class="token punctuation">.<span class="token function">isLocked<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Held by me: " <span class="token operator">+ lock<span class="token punctuation">.<span class="token function">isHeldByCurrentThread<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">boolean locked <span class="token operator">= lock<span class="token punctuation">.<span class="token function">tryLock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Lock acquired: " <span class="token operator">+ locked<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

While the first task holds the lock for one second the second task obtains different information about the current state of the lock:


The method?tryLock()?as an alternative to?lock()?tries to acquire the lock without pausing the current thread. The boolean result must be used to check if the lock has actually been acquired before accessing any shared mutable variables.

ReadWriteLock

The interface?ReadWriteLock?specifies another type of lock maintaining a pair of locks for read and write access. The idea behind read-write locks is that it's usually safe to read mutable variables concurrently as long as nobody is writing to this variable. So the read-lock can be held simultaneously by multiple threads as long as no threads hold the write-lock. This can improve performance and throughput in case that reads are more frequent than writes.

 map executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">writeLock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">.<span class="token function">lock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">try <span class="token punctuation">{
<span class="token function">sleep<span class="token punctuation">(<span class="token number">1<span class="token punctuation">)<span class="token punctuation">;
map<span class="token punctuation">.<span class="token function">put<span class="token punctuation">(<span class="token string">"foo"<span class="token punctuation">,<span class="token string">"bar"<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">finally <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">writeLock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">.<span class="token function">unlock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

The above example first acquires a write-lock in order to put a new value to the map after sleeping for one second. Before this task has finished two other tasks are being submitted trying to read the entry from the map and sleep for one second:

 executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(readTask<span class="token punctuation">)<span class="token punctuation">;
executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(readTask<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

When you execute this code sample you'll notice that both read tasks have to wait the whole second until the write task has finished. After the write lock has been released both read tasks are executed in parallel and print the result simultaneously to the console. They don't have to wait for each other to finish because read-locks can safely be acquired concurrently as long as no write-lock is held by another thread.

StampedLock

Java 8 ships with a new kind of lock called?StampedLock?which also support read and write locks just like in the example above. In contrast to?ReadWriteLock?the locking methods of a?StampedLock?return a stamp represented by a?long?value. You can use these stamps to either release a lock or to check if the lock is still valid. Additionally stamped locks support another lock mode called?optimistic locking.

Let's rewrite the last example code to use?StampedLock?instead of?ReadWriteLock:

 map executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
<span class="token keyword">long stamp <span class="token operator">= lock<span class="token punctuation">.<span class="token function">writeLock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">try <span class="token punctuation">{
<span class="token function">sleep<span class="token punctuation">(<span class="token number">1<span class="token punctuation">)<span class="token punctuation">;
map<span class="token punctuation">.<span class="token function">put<span class="token punctuation">(<span class="token string">"foo"<span class="token punctuation">,<span class="token string">"bar"<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">finally <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">unlockWrite<span class="token punctuation">(stamp<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

Runnable readTask <span class="token operator">= <span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
<span class="token keyword">long stamp <span class="token operator">= lock<span class="token punctuation">.<span class="token function">readLock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">try <span class="token punctuation">{
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(map<span class="token punctuation">.<span class="token function">get<span class="token punctuation">(<span class="token string">"foo"<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
<span class="token function">sleep<span class="token punctuation">(<span class="token number">1<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">finally <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">unlockRead<span class="token punctuation">(stamp<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">;

executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(readTask<span class="token punctuation">)<span class="token punctuation">;
executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(readTask<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

Obtaining a read or write lock via?readLock()?or?writeLock()?returns a stamp which is later used for unlocking within the finally block. Keep in mind that stamped locks don't implement reentrant characteristics. Each call to lock returns a new stamp and blocks if no lock is available even if the same thread already holds a lock. So you have to pay particular attention not to run into deadlocks.

Just like in the previous?ReadWriteLock?example both read tasks have to wait until the write lock has been released. Then both read tasks print to the console simultaneously because multiple reads doesn't block each other as long as no write-lock is held.

The next example demonstrates?optimistic locking:

executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
<span class="token keyword">long stamp <span class="token operator">= lock<span class="token punctuation">.<span class="token function">tryOptimisticRead<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">try <span class="token punctuation">{
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Optimistic Lock Valid: " <span class="token operator">+ lock<span class="token punctuation">.<span class="token function">validate<span class="token punctuation">(stamp<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
<span class="token function">sleep<span class="token punctuation">(<span class="token number">1<span class="token punctuation">)<span class="token punctuation">;
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Optimistic Lock Valid: " <span class="token operator">+ lock<span class="token punctuation">.<span class="token function">validate<span class="token punctuation">(stamp<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
<span class="token function">sleep<span class="token punctuation">(<span class="token number">2<span class="token punctuation">)<span class="token punctuation">;
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Optimistic Lock Valid: " <span class="token operator">+ lock<span class="token punctuation">.<span class="token function">validate<span class="token punctuation">(stamp<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">finally <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">unlock<span class="token punctuation">(stamp<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
<span class="token keyword">long stamp <span class="token operator">= lock<span class="token punctuation">.<span class="token function">writeLock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">try <span class="token punctuation">{
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Write Lock acquired"<span class="token punctuation">)<span class="token punctuation">;
<span class="token function">sleep<span class="token punctuation">(<span class="token number">2<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">finally <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">unlock<span class="token punctuation">(stamp<span class="token punctuation">)<span class="token punctuation">;
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Write done"<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

An optimistic read lock is acquired by calling?tryOptimisticRead()?which always returns a stamp without blocking the current thread,no matter if the lock is actually available. If there's already a write lock active the returned stamp equals zero. You can always check if a stamp is valid by calling?lock.validate(stamp).

Executing the above code results in the following output:


The optimistic lock is valid right after acquiring the lock. In contrast to normal read locks an optimistic lock doesn't prevent other threads to obtain a write lock instantaneously. After sending the first thread to sleep for one second the second thread obtains a write lock without waiting for the optimistic read lock to be released. From this point the optimistic read lock is no longer valid. Even when the write lock is released the optimistic read locks stays invalid.

So when working with optimistic locks you have to validate the lock every time?after?accessing any shared mutable variable to make sure the read was still valid.

Sometimes it's useful to convert a read lock into a write lock without unlocking and locking again.?StampedLock?provides the method?tryConvertToWriteLock()?for that purpose as seen in the next sample:

executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
<span class="token keyword">long stamp <span class="token operator">= lock<span class="token punctuation">.<span class="token function">readLock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">try <span class="token punctuation">{
<span class="token keyword">if <span class="token punctuation">(count <span class="token operator">== <span class="token number">0<span class="token punctuation">) <span class="token punctuation">{
stamp <span class="token operator">= lock<span class="token punctuation">.<span class="token function">tryConvertToWriteLock<span class="token punctuation">(stamp<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">if <span class="token punctuation">(stamp <span class="token operator">== 0L<span class="token punctuation">) <span class="token punctuation">{
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Could not convert to write lock"<span class="token punctuation">)<span class="token punctuation">;
stamp <span class="token operator">= lock<span class="token punctuation">.<span class="token function">writeLock<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
count <span class="token operator">= <span class="token number">23<span class="token punctuation">;
<span class="token punctuation">}
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(count<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">finally <span class="token punctuation">{
lock<span class="token punctuation">.<span class="token function">unlock<span class="token punctuation">(stamp<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

The task first obtains a read lock and prints the current value of field?count?to the console. But if the current value is zero we want to assign a new value of?23. We first have to convert the read lock into a write lock to not break potential concurrent access by other threads. Calling?tryConvertToWriteLock()?doesn't block but may return a zero stamp indicating that no write lock is currently available. In that case we call?writeLock()?to block the current thread until a write lock is available.

Semaphores

In addition to locks the Concurrency API also supports counting semaphores. Whereas locks usually grant exclusive access to variables or resources,a semaphore is capable of maintaining whole sets of permits. This is useful in different scenarios where you have to limit the amount concurrent access to certain parts of your application.

Here's an example how to limit access to a long running task simulated by?sleep(5):

Semaphore semaphore <span class="token operator">= <span class="token keyword">new <span class="token class-name">Semaphore<span class="token punctuation">(<span class="token number">5<span class="token punctuation">)<span class="token punctuation">;

Runnable longRunningTask <span class="token operator">= <span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
<span class="token keyword">boolean permit <span class="token operator">= <span class="token boolean">false<span class="token punctuation">;
<span class="token keyword">try <span class="token punctuation">{
permit <span class="token operator">= semaphore<span class="token punctuation">.<span class="token function">tryAcquire<span class="token punctuation">(<span class="token number">1<span class="token punctuation">,TimeUnit<span class="token punctuation">.SECONDS<span class="token punctuation">)<span class="token punctuation">;
<span class="token keyword">if <span class="token punctuation">(permit<span class="token punctuation">) <span class="token punctuation">{
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Semaphore acquired"<span class="token punctuation">)<span class="token punctuation">;
<span class="token function">sleep<span class="token punctuation">(<span class="token number">5<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">else <span class="token punctuation">{
System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Could not acquire semaphore"<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">} <span class="token keyword">catch <span class="token punctuation">(<span class="token class-name">InterruptedException e<span class="token punctuation">) <span class="token punctuation">{
<span class="token keyword">throw <span class="token keyword">new <span class="token class-name">IllegalStateException<span class="token punctuation">(e<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">} <span class="token keyword">finally <span class="token punctuation">{
<span class="token keyword">if <span class="token punctuation">(permit<span class="token punctuation">) <span class="token punctuation">{
semaphore<span class="token punctuation">.<span class="token function">release<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}
<span class="token punctuation">}
<span class="token punctuation">}

IntStream<span class="token punctuation">.<span class="token function">range<span class="token punctuation">(<span class="token number">0<span class="token punctuation">,<span class="token number">10<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">forEach<span class="token punctuation">(i <span class="token operator">-<span class="token operator">> executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(longRunningTask<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

The executor can potentially run 10 tasks concurrently but we use a semaphore of size 5,thus limiting concurrent access to 5. It's important to use a?try/finally?block to properly release the semaphore even in case of exceptions.

Executing the above code results in the following output:


The semaphores permits access to the actual long running operation simulated by?sleep(5)up to a maximum of 5. Every subsequent call to?tryAcquire()?elapses the maximum wait timeout of one second,resulting in the appropriate console output that no semaphore could be acquired.

This was the second part out of a series of concurrency tutorials. More parts will be released in the near future,so stay tuned. As usual you find all code samples from this article on?,so feel free to fork the repo and try it by your own.

I hope you've enjoyed this article. If you have any further questions send me your feedback in the comments below. You should also??for more dev-related stuff!

  • Part 1:?
  • Part 2: Synchronization and Locks
  • Part 3:?

Welcome to the third part of my tutorial series about multi-threaded programming in Java 8. This tutorial covers two important parts of the Concurrency API: Atomic Variables and Concurrent Maps. Both have been greatly improved with the introduction of lambda expressions and functional programming in the latest Java 8 release. All those new features are described with a bunch of easily understood code samples. Enjoy!

  • Part 1:?
  • Part 2:?
  • Part 3: Atomic Variables and ConcurrentMap

For simplicity the code samples of this tutorial make use of the two helper methods?sleep(seconds)?and?stop(executor)?as defined?.

AtomicInteger

The package?java.concurrent.atomic?contains many useful classes to perform atomic operations. An operation is atomic when you can safely perform the operation in parallel on multiple threads without using the?synchronized?keyword or locks as shown in my?.

Internally,the atomic classes make heavy use of??(CAS),an atomic instruction directly supported by most modern CPUs. Those instructions usually are much faster than synchronizing via locks. So my advice is to prefer atomic classes over locks in case you just have to change a single mutable variable concurrently.

Now let's pick one of the atomic classes for a few examples:?AtomicInteger

ExecutorService executor <span class="token operator">= Executors<span class="token punctuation">.<span class="token function">newFixedThreadPool<span class="token punctuation">(<span class="token number">2<span class="token punctuation">)<span class="token punctuation">;

IntStream<span class="token punctuation">.<span class="token function">range<span class="token punctuation">(<span class="token number">0<span class="token punctuation">,<span class="token number">1000<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">forEach<span class="token punctuation">(i <span class="token operator">-<span class="token operator">> executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(atomicInt<span class="token operator">:<span class="token operator">:incrementAndGet<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(atomicInt<span class="token punctuation">.<span class="token function">get<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">; <span class="token comment">// => 1000

By using?AtomicInteger?as a replacement for?Integer?we're able to increment the number concurrently in a thread-safe manor without synchronizing the access to the variable. The method?incrementAndGet()?is an atomic operation so we can safely call this method from multiple threads.

AtomicInteger supports various kinds of atomic operations. The method?updateAndGet()accepts a lambda expression in order to perform arbitrary arithmetic operations upon the integer:

ExecutorService executor <span class="token operator">= Executors<span class="token punctuation">.<span class="token function">newFixedThreadPool<span class="token punctuation">(<span class="token number">2<span class="token punctuation">)<span class="token punctuation">;

IntStream<span class="token punctuation">.<span class="token function">range<span class="token punctuation">(<span class="token number">0<span class="token punctuation">,<span class="token number">1000<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">forEach<span class="token punctuation">(i <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
Runnable task <span class="token operator">= <span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">>
atomicInt<span class="token punctuation">.<span class="token function">updateAndGet<span class="token punctuation">(n <span class="token operator">-<span class="token operator">> n <span class="token operator">+ <span class="token number">2<span class="token punctuation">)<span class="token punctuation">;
executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(task<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(atomicInt<span class="token punctuation">.<span class="token function">get<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">; <span class="token comment">// => 2000

The method?accumulateAndGet()?accepts another kind of lambda expression of type?IntBinaryOperator. We use this method to sum up all values from 0 to 1000 concurrently in the next sample:

ExecutorService executor <span class="token operator">= Executors<span class="token punctuation">.<span class="token function">newFixedThreadPool<span class="token punctuation">(<span class="token number">2<span class="token punctuation">)<span class="token punctuation">;

IntStream<span class="token punctuation">.<span class="token function">range<span class="token punctuation">(<span class="token number">0<span class="token punctuation">,<span class="token number">1000<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">forEach<span class="token punctuation">(i <span class="token operator">-<span class="token operator">> <span class="token punctuation">{
Runnable task <span class="token operator">= <span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">>
atomicInt<span class="token punctuation">.<span class="token function">accumulateAndGet<span class="token punctuation">(i<span class="token punctuation">,<span class="token punctuation">(n<span class="token punctuation">,m<span class="token punctuation">) <span class="token operator">-<span class="token operator">> n <span class="token operator">+ m<span class="token punctuation">)<span class="token punctuation">;
executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(task<span class="token punctuation">)<span class="token punctuation">;
<span class="token punctuation">}<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(atomicInt<span class="token punctuation">.<span class="token function">get<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">; <span class="token comment">// => 499500

Other useful atomic classes are?,??and?.

LongAdder

The class?LongAdder?as an alternative to?AtomicLong?can be used to consecutively add values to a number.

IntStream<span class="token punctuation">.<span class="token function">range<span class="token punctuation">(<span class="token number">0<span class="token punctuation">,<span class="token number">1000<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">forEach<span class="token punctuation">(i <span class="token operator">-<span class="token operator">> executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(adder<span class="token operator">:<span class="token operator">:increment<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(adder<span class="token punctuation">.<span class="token function">sumThenReset<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">; <span class="token comment">// => 1000

LongAdder provides methods?add()?and?increment()?just like the atomic number classes and is also thread-safe. But instead of summing up a single result this class maintains a set of variables internally to reduce contention over threads. The actual result can be retrieved by calling?sum()?or?sumThenReset().

This class is usually preferable over atomic numbers when updates from multiple threads are more common than reads. This is often the case when capturing statistical data,e.g. you want to count the number of requests served on a web server. The drawback of?LongAdder?is higher memory consumption because a set of variables is held in-memory.

LongAccumulator

LongAccumulator is a more generalized version of LongAdder. Instead of performing simple add operations the class?LongAccumulator?builds around a lambda expression of type?LongBinaryOperator?as demonstrated in this code sample:

 ExecutorService executor <span class="token operator">= Executors<span class="token punctuation">.<span class="token function">newFixedThreadPool<span class="token punctuation">(<span class="token number">2<span class="token punctuation">)<span class="token punctuation">;

IntStream<span class="token punctuation">.<span class="token function">range<span class="token punctuation">(<span class="token number">0<span class="token punctuation">,<span class="token number">10<span class="token punctuation">)
<span class="token punctuation">.<span class="token function">forEach<span class="token punctuation">(i <span class="token operator">-<span class="token operator">> executor<span class="token punctuation">.<span class="token function">submit<span class="token punctuation">(<span class="token punctuation">(<span class="token punctuation">) <span class="token operator">-<span class="token operator">> accumulator<span class="token punctuation">.<span class="token function">accumulate<span class="token punctuation">(i<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">;

<span class="token function">stop<span class="token punctuation">(executor<span class="token punctuation">)<span class="token punctuation">;

System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(accumulator<span class="token punctuation">.<span class="token function">getThenReset<span class="token punctuation">(<span class="token punctuation">)<span class="token punctuation">)<span class="token punctuation">; <span class="token comment">// => 2539

We create a LongAccumulator with the function?2 * x + y?and an initial value of one. With every call to?accumulate(i)?both the current result and the value?i?are passed as parameters to the lambda expression.

A?LongAccumulator?just like?LongAdder?maintains a set of variables internally to reduce contention over threads.

ConcurrentMap

The interface?ConcurrentMap?extends the map interface and defines one of the most useful concurrent collection types. Java 8 introduces functional programming by adding new methods to this interface.

In the next code snippets we use the following sample map to demonstrates those new methods:

 map 

The method?forEach()?accepts a lambda expression of type?BiConsumer?with both the key and value of the map passed as parameters. It can be used as a replacement to for-each loops to iterate over the entries of the concurrent map. The iteration is performed sequentially on the current thread.

 System

The method?putIfAbsent()?puts a new value into the map only if no value exists for the given key. At least for the?ConcurrentHashMap?implementation of this method is thread-safe just like?put()?so you don't have to synchronize when accessing the map concurrently from different threads:


The method?getOrDefault()?returns the value for the given key. In case no entry exists for this key the passed default value is returned:


The method?replaceAll()?accepts a lambda expression of type?BiFunction. BiFunctions take two parameters and return a single value. In this case the function is called with the key and the value of each map entry and returns a new value to be assigned for the current key:

 

Instead of replacing all values of the map?compute()?let's us transform a single entry. The method accepts both the key to be computed and a bi-function to specify the transformation of the value.

 value 

In addition to?compute()?two variants exist:?computeIfAbsent()?and?computeIfPresent(). The functional parameters of these methods only get called if the key is absent or present respectively.

Finally,the method?merge()?can be utilized to unify a new value with an existing value in the map. Merge accepts a key,the new value to be merged into the existing entry and a bi-function to specify the merging behavior of both values:

 newVal 

ConcurrentHashMap

All those methods above are part of the?ConcurrentMap?interface,thereby available to all implementations of that interface. In addition the most important implementationConcurrentHashMap?has been further enhanced with a couple of new methods to perform parallel operations upon the map.

Just like parallel streams those methods use a special?ForkJoinPool?available via?ForkJoinPool.commonPool()?in Java 8. This pool uses a preset parallelism which depends on the number of available cores. Four CPU cores are available on my machine which results in a parallelism of three:


This value can be decreased or increased by setting the following JVM parameter:


We use the same example map for demonstrating purposes but this time we work upon the concrete implementation?ConcurrentHashMap?instead of the interface?ConcurrentMap,so we can access all public methods from this class:


Java 8 introduces three kinds of parallel operations:?forEach,?search?and?reduce. Each of those operations are available in four forms accepting functions with keys,values,entries and key-value pair arguments.

All of those methods use a common first argument called?parallelismThreshold. This threshold indicates the minimum collection size when the operation should be executed in parallel. E.g. if you pass a threshold of 500 and the actual size of the map is 499 the operation will be performed sequentially on a single thread. In the next examples we use a threshold of one to always force parallel execution for demonstrating purposes.

ForEach

The method?forEach()?is capable of iterating over the key-value pairs of the map in parallel. The lambda expression of type?BiConsumer?is called with the key and value of the current iteration step. In order to visualize parallel execution we print the current threads name to the console. Keep in mind that in my case the underlying?ForkJoinPool?uses up to a maximum of three threads.

    System<span class="token comment">// key: r2; value: d2; thread: main
<span class="token comment">// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
<span class="token comment">// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
<span class="token comment">// key: c3; value: p0; thread: main

Search

The method?search()?accepts a?BiFunction?returning a non-null search result for the current key-value pair or?null?if the current iteration doesn't match the desired search criteria. As soon as a non-null result is returned further processing is suppressed. Keep in mind that?ConcurrentHashMap?is unordered. The search function should not depend on the actual processing order of the map. If multiple entries of the map match the given search function the result may be non-deterministic.

 <span class="token comment">// ForkJoinPool.commonPool-worker-2
<span class="token comment">// main
<span class="token comment">// ForkJoinPool.commonPool-worker-3
<span class="token comment">// Result: bar

Here's another example searching solely on the values of the map:

  System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Result: " <span class="token operator">+ result<span class="token punctuation">)<span class="token punctuation">;

<span class="token comment">// ForkJoinPool.commonPool-worker-2
<span class="token comment">// main
<span class="token comment">// main
<span class="token comment">// ForkJoinPool.commonPool-worker-1
<span class="token comment">// Result: solo

Reduce

The method?reduce()?already known from Java 8 Streams accepts two lambda expressions of type?BiFunction. The first function transforms each key-value pair into a single value of any type. The second function combines all those transformed values into a single result,ignoring any possible?null?values.

  System<span class="token punctuation">.out<span class="token punctuation">.<span class="token function">println<span class="token punctuation">(<span class="token string">"Result: " <span class="token operator">+ result<span class="token punctuation">)<span class="token punctuation">;

<span class="token comment">// Transform: ForkJoinPool.commonPool-worker-2
<span class="token comment">// Transform: main
<span class="token comment">// Transform: ForkJoinPool.commonPool-worker-3
<span class="token comment">// Reduce: ForkJoinPool.commonPool-worker-3
<span class="token comment">// Transform: main
<span class="token comment">// Reduce: main
<span class="token comment">// Reduce: main
<span class="token comment">// Result: r2=d2,c3=p0,han=solo,foo=bar

I hope you've enjoyed reading the third part of my tutorial series about Java 8 Concurrency. The code samples from this tutorial are??along with many other Java 8 code snippets. You're welcome to fork the repo and try it by your own.

If you want to support my work,please share this tutorial with your friends. You should also??as I constantly tweet about Java and programming related stuff.

  • Part 1:?
  • Part 2:?
  • Part 3: Atomic Variables and ConcurrentMap

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读