Semaphore介绍
- 假设我们有五个线程执行任务,但是我们的资源却是有限的,只有三个资源。而只有拿到这个资源的线程才能执行任务,这个时候我们应该怎么做?
Semaphore 是 Java 中的一个同步工具类,翻译过来就是信号量,用来实现流量控制:控制并发访问资源的线程数量。它可以用来限制同时访问某个共享资源的线程数量,或者在某些场景下实现线程的互斥和同步。
无论是Synchroniezd还是ReentrantLock,一次都只允许一个线程访问一个资源,但是Semaphore可以指定多个线程同时访问某一个资源.
Semaphore有一个构造函数,可以传入一个int型整数n,表示某段代码最多只有n个线程可以访问,如果超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。
应用场景
信号量主要用于两个目的:
- 用于多个共享资源的互斥使用.
- 用于并发线程数的控制.
以下的例子:5个线程抢3个车位,同时最多只有3个线程能抢到车位,等其他线程释放信号量后,才能抢到车位.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static void main(String[] args) { Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"抢到车位"); ThreadUtil.sleep(RandomUtil.randomInt(1000,5000)); System.out.println(Thread.currentThread().getName()+"归还车位"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); }
} },"线程"+i).start(); } }
|
注意事项
Semaphore.acquire()
和 Semaphore.release()
总是配对使用的,这点需要由应用代码自身保证。
Semaphore.release()
调用应该放在 finally
块中,以避免应用代码出现异常的情况下,当前线程所获得的信号量无法返还。
- 如果
Semaphore
构造器中的参数 permits
值设置为 1,所创建的 Semaphore
相当于一个互斥锁。与其他互斥锁不同的是,这种互斥锁允许一个线程释放另外一个线程所持有的锁。因为一个线程可以在未执行过 Semaphore.acquire()
的情况下执行相应的 Semaphore.release()
。
- 默认情况下,
Semaphore
采用的是非公平性调度策略。
Semaphore的方法
Semaphore 构造方法
1
| public Semaphore(int permits)
|
- 参数
permits
: 初始化信号量的许可数,表示允许同时访问共享资源的线程数。
1
| public Semaphore(int permits, boolean fair)
|
- 参数
fair
:
true
: 按照线程请求的顺序分配许可(FIFO)。
false
: 非公平模式,可能会使线程调度更高效。
常用方法
Semaphore 维护了一个可用的许可数量,线程需要获取许可才能继续执行,而许可数量会在每次获取和释放时进行调整。Semaphore 的主要方法包括:
1. void acquire()
功能: 获取一个许可。如果当前没有可用许可,则线程会阻塞,直到获取到许可或线程被中断。
详细说明:
- 调用此方法时,信号量的值会减 1。
- 如果信号量的值为 0,调用线程会被挂起进入等待队列。
示例:
1 2 3 4 5
| Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
semaphore.release();
|
2. void acquire(int permits)
功能: 获取指定数量的许可。如果当前许可不足,则线程会阻塞。
详细说明:
- 一次性尝试获取多个许可。
- 如果许可不足,线程会被挂起,直到有足够的许可被释放。
示例:
1 2 3 4 5
| Semaphore semaphore = new Semaphore(5);
semaphore.acquire(3);
semaphore.release(3);
|
3. void release()
功能: 释放一个许可,将信号量的值加 1,并唤醒等待队列中的一个线程。
详细说明:
release()
可以被任何线程调用,而不要求是之前调用 acquire()
的线程。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| Semaphore semaphore = new Semaphore(1);
new Thread(() -> { try { semaphore.acquire(); System.out.println("Thread 1 is running"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }).start();
new Thread(() -> { try { semaphore.acquire(); System.out.println("Thread 2 is running"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }).start();
|
4. void release(int permits)
功能: 释放指定数量的许可,将信号量的值增加相应的数量,并唤醒等待队列中的线程。
详细说明:
示例:
1 2 3 4 5 6
| Semaphore semaphore = new Semaphore(2);
semaphore.acquire(2); System.out.println("Both permits acquired");
semaphore.release(2);
|
5. boolean tryAcquire()
功能: 尝试获取一个许可,如果成功则立即返回 true
,否则返回 false
。
详细说明:
示例:
1 2 3 4 5 6 7 8 9 10 11
| Semaphore semaphore = new Semaphore(1);
if (semaphore.tryAcquire()) { try { System.out.println("Permit acquired"); } finally { semaphore.release(); } } else { System.out.println("Failed to acquire permit"); }
|
6. boolean tryAcquire(long timeout, TimeUnit unit)
功能: 在指定时间内尝试获取一个许可,如果在超时时间内获取成功,则返回 true
,否则返回 false
。
详细说明:
示例:
1 2 3 4 5 6 7 8 9 10 11
| Semaphore semaphore = new Semaphore(1);
if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) { try { System.out.println("Permit acquired within timeout"); } finally { semaphore.release(); } } else { System.out.println("Timeout while waiting for permit"); }
|
7. int availablePermits()
功能: 返回当前信号量中可用的许可数量。
用途:
示例:
1 2 3 4 5
| Semaphore semaphore = new Semaphore(3);
System.out.println("Available permits: " + semaphore.availablePermits()); semaphore.acquire(); System.out.println("Available permits after acquire: " + semaphore.availablePermits());
|
8. int drainPermits()
功能: 获取并返回所有可用的许可,将信号量值置为 0。
用途:
示例:
1 2 3 4 5 6
| Semaphore semaphore = new Semaphore(5);
System.out.println("Available permits before drain: " + semaphore.availablePermits()); int drained = semaphore.drainPermits(); System.out.println("Drained permits: " + drained); System.out.println("Available permits after drain: " + semaphore.availablePermits());
|
9. boolean hasQueuedThreads()
功能: 检查是否有线程在等待信号量的许可。
用途:
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Semaphore semaphore = new Semaphore(1);
new Thread(() -> { try { semaphore.acquire(); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }).start();
Thread.sleep(100); System.out.println("Has queued threads: " + semaphore.hasQueuedThreads());
|
应用
调用3个线程实现轮流打印数字从1至100:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import java.util.concurrent.Semaphore;
public class SemaphoreSolution2 { private static int n = 1; private static final int total = 100;
public static void main(String[] args) { printNumbers(); } static void printNumbers(){ Semaphore s1 = new Semaphore(1); Semaphore s2 = new Semaphore(0); Semaphore s3 = new Semaphore(0);
Thread t1 = new Thread(new PrintTask(1, s1, s2), "Thread-1"); Thread t2 = new Thread(new PrintTask(2, s2, s3), "Thread-2"); Thread t3 = new Thread(new PrintTask(0, s3, s1), "Thread-3");
t1.start(); t2.start(); t3.start();
}
static class PrintTask implements Runnable{ private final int threadId; private final Semaphore current; private final Semaphore next;
public PrintTask(int threadId, Semaphore current, Semaphore next){ this.threadId = threadId; this.current = current; this.next = next; }
@Override public void run(){ while (n <= total) { try { current.acquire(); if (n <= total && n % 3 == threadId) { System.out.println(Thread.currentThread().getName() + " " + n); n++; } next.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
|
参考
https://juejin.cn/post/6844904049410768910