Java utils concurrent
1 synchronized1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class ThreadDemo1 {
public static void main(String[] args) {
Target target = new Target();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
target.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
target.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
target.sale();
}
}, "C").start();
}
}
class Target {
int a = 10;
public synchronized void sale() {
if (a > 0) {
System.out.println(a-- + Thread.currentThread().getName());
}
}
}
锁对象1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21package Thread;
public class ThreadDemo {
public static void main(String[] args) {
new Count().start();
new Count().start();
}
}
class Count extends Thread{
static int a = 10;
public void run() {
synchronized ((Object)a){
for(int i =0;i<3;i++) {
System.out.println(a-- +" "+Thread.currentThread().getName());
}
}
}
}
2.Lock
Lock是一个接口,有三个实现类
Lock使用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public class LockDemo {
public static void main(String[] args) {
Target target = new Target();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
target.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
target.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
target.sale();
}
}, "C").start();
}
}
class Target {
int a = 10;
Lock lock = new ReentrantLock();
public void sale() {
lock.lock();
try{
if (a > 0) {
System.out.println(a-- + Thread.currentThread().getName());
}}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
3.Condition精准唤醒
Condtion.await()
Condtion.signal();
Condition.signalAll();1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69public class ConditionDemo {
public static void main(String[] args) {
Test test = new Test();
new Thread(() -> {
test.A();
},"A").start();
new Thread(() -> {
test.B();
},"B").start();
new Thread(() -> {
test.C();
},"C").start();
}
}
class Test {
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
public void A() {
lock.lock();
try{
while(number!=0) {
condition1.await();
}
number=1;
System.out.println(Thread.currentThread().getName()+" "+number);
condition2.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void B() {
lock.lock();
try{
while(number!=1) {
condition2.await();
}
number=2;
System.out.println(Thread.currentThread().getName()+" "+number);
condition3.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void C() {
lock.lock();
try{
while(number!=2) {
condition3.await();
}
number=0;
System.out.println(Thread.currentThread().getName()+" "+number);
condition1.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
4.生产者消费者两种实现(Lock和synchronized)
Synchronized1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public class SynchronizeDemo {
public static void main(String[] args) {
Number number = new Number();
new Thread(() ->{
try {
number.inc();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(() ->{
try {
number.dec();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
class Number{
int number= 0;
//while防止虚假唤醒
public synchronized void inc() throws InterruptedException {
//等待条件
while (number>0){
this.wait();
}
//操作
number++;
System.out.println(Thread.currentThread().getName()+number);
//唤醒
this.notifyAll();
}
public synchronized void dec() throws InterruptedException {
while (number<=0) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+number+"dec");
this.notifyAll();
}
}
Lock1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63public class LockDemo {
public static void main(String[] args) {
Number1 number = new Number1();
new Thread(() ->{
try {
number.inc();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(() ->{
try {
number.dec();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
class Number1{
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
int number= 0;
//while防止虚假唤醒
public void inc() throws InterruptedException {
lock.lock();
try {
//等待条件
while (number>0){
condition.await();
}
//操作
number++;
System.out.println(Thread.currentThread().getName()+number);
//唤醒
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void dec() throws InterruptedException {
lock.lock();
try {
//等待条件
while (number<=0){
condition.await();
}
//操作
number--;
System.out.println(Thread.currentThread().getName()+number+ "dec");
//唤醒
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
5.并发情况下集合会变的不安全的解决办法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
171.使用Vector
List list = new Vector();
底层使用sychronized
2.使用Collections.sychronized...
Collections.sychronizedList(new ArrayList())
Collections.sychronizedSet
Collections.sychronizedMap
底层是synchronized
3.使用CopyOnWriteArrayList()
CopyOnWriteSet()
底层的方法使用的是用lock加锁
4.对于Map,List
ConcurrentHashMap
ConcurrentLinkedDeque
ConcurrenLinkedQueue
ConcurrentSkipListMap
ConcurrentSkipListSet
6.线程的实现方式之Callable
通过继承Callable接口,实现call方法,然后由new Thread().start调用FutureTask来运行,
同时可以更具FutureTask获取返回值。
对于Callable而言,它可以有返回值,同时也可以抛出异常。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class ThreadCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Task task = new Task();
FutureTask futureTask = new FutureTask(task);
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
}
class Task implements Callable<String>{
public String call() throws Exception {
System.out.println("call");
return "hello";
}
}
7.并发工具类:
CountDownLatch 减法计数器,countdown到0时会执行下一步操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadTimes = 7;
CountDownLatch countDownLatch = new CountDownLatch(threadTimes);
for(int i=0;i<7;i++) {
new Thread(()->{
System.out.println("等待");
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("下一步");
}
}
CycliBarrier 加法计数器,加到一个初始值,在都执行下一步操作,等所有线程到达栅栏,在一起放行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class CyclicBarrir {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("一起执行");
});
for(int i=0;i<7;i++) {
new Thread(()->{
System.out.println("积累");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore:信号量,做限流操作,初始化证书,只有获取证书的线程才可以执行操作,执行完释放信号,其他再来获取,在执行,在释放,知道所有线程执行完
semaphore.aquire
semaphore.release1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for(int i=0;i<6;i++){
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"获取许可");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"离开");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
8.读写锁ReadWriteLock
读写锁可以分别锁读和锁写操作,以至于读操作可以并发读,但是写同一时间只能有一个再写1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53public class ReadWriteLockDemo {
public static void main(String[] args) throws InterruptedException {
Cache cache = new Cache();
for(int i=0;i<5;i++) {
final int a=i;
new Thread(()->{
cache.put("key"+a,"value"+a);
},String.valueOf(i)).start();
}
for(int i=0;i<5;i++) {
final int a=i;
new Thread(()->{
cache.get("key"+a);
},String.valueOf(i)).start();
}
}
}
class Cache{
private volatile Map<String,Object> map=new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
lock.writeLock().lock();
try{
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"write");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.writeLock().unlock();
}
}
public void get(String key) {
lock.readLock().lock();
try {
map.get(key);
System.out.println(Thread.currentThread().getName()+"read");
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.readLock().unlock();
}
}
}
9.队列的四组api
10.线程池
要使用ThreadPoolExecutor(七个参数)创建,防止OOM
拒绝策略
IO密集型和cpu密集型
函数型接口和断定型接口
消费型接口和供给型接口
Stream流式计算
forkJoin
将大人物拆分成小任务去执行,然后执行完毕在合并结果,效率更高。
要知道工作窃取是其特点,forkjoin维护的是一个双端队列,当一个队列执行完了,可以窃取另一个队列中的任务,完成执行,提高效率
Stream流是计算会比forkjoin更快
异步回调
future,可以异步执行
complete
CAS
比较并交换
自旋锁,不停比较,直到自己期望的值出现,while中的循环语句会从内存中获取公共的v,由于加了volatile,内存可见,然后再次用while条件中的cas再次获取内存中的v,看两次获取的v是否一样,一样的话证明内存没有被修改,可以+1更新内存,若是有,就再次自旋,直到两次获得的一样,再+1(两次比较)
v5内存中的值,v1是this对象,v2offset偏移量,v4怎加的值
可重入锁:在一个范围内,拿到一个锁,就相当于拿到了这个范围内的所有锁
补充:
线程状态: