shared_future
- 若需要使用一个线程的结果,让多个线程获取呢?
可以使用shared_future
可多次调用
get():与std::future不同,shared_future的get()可多次调用线程安全:多个线程可同时调用
get(),但返回引用类型时要小心数据竞争异常传播:异常会被存储,每次
get()都会重新抛出生命周期:共享状态由所有副本共同管理,最后一个副本销毁时释放资源
复制廉价:复制操作只增加引用计数,适合传递到多个线程
值语义优先:尽量返回值类型而非引用类型,避免悬挂引用
检查有效性:使用前检查
valid(),避免操作空的shared_future内存模型:
get()提供memory_order_acquire语义,确保结果可见性
1 #include <iostream> 2 #include <thread> 3 #include <chrono> 4 #include <functional> 5 #include <future> 6 7 using namespace std; 8 void computeA(promise<int> &&prom){ 9 cout<< "进入A!" <<endl; 10 this_thread::sleep_for(chrono::seconds(5));//5s 11 cout<< "A完成! 5s" <<endl; 12 prom.set_value(1);//设置结果值 13 } 14 15 16 void computeB(shared_future<int> shared_fut){ 17 cout<< "进入B!"<< endl; 18 shared_fut.get(); //等待A完成 19 this_thread::sleep_for(chrono::seconds(4));//4s 20 cout<< "B完成! 4s" <<endl; 21 22 } 23 24 25 void computeC(shared_future<int> shared_fut){ 26 cout<< "进入C!" <<endl; 27 shared_fut.get(); 28 cout<< "C完成!" <<endl; 29 } 30 31 int main(){ 32 33 promise<int> prom_i; 34 future<int> resultFutI = prom_i.get_future(); 35 shared_future<int> shared_fut = resultFutI.share();//两步获取shared_future 36 // 直接从 promise 获取 shared_future 37 //share_future<int> share_fut = prom_i.get_future().share(); 38 39 thread threadA = thread(computeA , move(prom_i)); 40 41 thread threadB = thread(computeB , shared_fut); 42 43 thread threadC = thread(computeC , shared_fut); 44 45 46 threadA.join(); 47 48 threadB.join(); 49 50 threadC.join(); 51 }- aysnc
1 #include <iostream> 2 #include <thread> 3 #include <chrono> 4 #include <functional> 5 #include <future> 6 7 using namespace std; 8 int computeA(){ 9 cout<< "进入A!" <<endl; 10 this_thread::sleep_for(chrono::seconds(5));//5s 11 cout<< "A完成! 5s" <<endl; 12 return 100; 13 } 14 15 16 void computeB(shared_future<int> shared_fut){ 17 cout<< "进入B!"<< endl; 18 int result = shared_fut.get(); //等待A完成 19 this_thread::sleep_for(chrono::seconds(4));//4s 20 cout<< "B完成! 4s result = "<< result <<endl; 21 22 } 23 24 int main(){ 25 cout<<"main fun! "<<endl; 26 future<int> fut = async(launch::async,computeA); 27 shared_future<int> shared_fut = fut.share(); 28 thread threadA = thread(computeB,shared_fut); 29 for(int i = 0;i<50;i++){ 30 cout<<"main: i = "<< i <<endl; 31 } 32 threadA.join(); 33 return 0; 34 35 }线程同步
线程同步是多线程编程中协调线程执行顺序的机制,通过控制多个线程对共享资源的访问顺序,防止数据竞争与不可预知的数据损坏。其核心在于保证同一时刻仅有一个线程操作关键数据段。
为什么要线程同步
解决竞争条件和数据不一致性。线程同步的本质就是保证数据操作原子性。
线程同步的方法:
互斥锁、读写锁、条件变量、原子变量、线程局部存储。
互斥锁 mutex和原子变量 atomic
mutex:提供基本的锁定和解锁功能;
recursive_mutex:递归互斥锁,允许同一个线程多次锁定同一个互斥锁,避免自死锁。
timed_mutex:带超时功能的互斥锁,可以尝试锁定一段时间,避免永久阻塞。
recursive_timed_mutex:结合递归和超时功能的互斥锁.
锁管理器 (RAII机制)
- lock_guard:轻量级,自动释放,构造时加锁
- unique_lock:支持延时锁定,手动锁定/解锁,所有权转移
- 延迟锁定:
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
- 延迟锁定:
- scoped_lock - 多锁管理 C++ 17
std::mutex mtx1, mtx2, mtx3; // 同时锁定多个互斥锁,避免死锁 std::scoped_lock lock(mtx1, mtx2, mtx3);
atomic内存序
内存序(Memory Order)是因为编译器和 CPU 为了性能,会进行指令重排(Instruction Reordering)。
memory_order_relaxed
relaxed读/写无同步,仅保证操作原子化(常用于计数器)。
consume读比 acquire 更轻,只同步有数据依赖的变量(不推荐初学者使用)。
acquire读配合 release,防止读操作被重排到后面。
release写配合 acquire,防止写操作被重排到前面。
acq_rel读-改-写同时具有 acquire 和 release 的特性。
seq_cst读/写最严格,所有线程看到完全一致的顺序。默认。
实例:
1 #include <iostream> 2 #include <thread> 3 #include <chrono> 4 #include <functional> 5 #include <future> 6 #include <mutex> 7 #include <atomic> 8 9 using namespace std; 10 //atomic<int> shared_data(0); atomic适用于基本类型 11 int shared_data = 0; 12 mutex g_mutex; 13 14 //测试不准确 15 //不加锁 0ms 结果错误 16 //atomic 3ms 17 //mutex 加锁 12ms 18 //lock_guard 13ms 19 //unique_lock 15ms 20 21 void addValue() { 22 for(int i = 0;i<100000;++i){ 23 // mutex加锁 24 // g_mutex.lock(); 25 // ++shared_data; 26 // g_mutex.unlock(); 27 28 unique_lock<mutex> lock(g_mutex,defer_lock); 29 lock.lock(); 30 ++shared_data; 31 lock.unlock(); 32 33 } 34 } 35 36 37 int main(){ 38 auto startTime = chrono::high_resolution_clock::now(); 39 cout<<"main fun! "<<endl; 40 thread threadA = thread(addValue); 41 thread threadB = thread(addValue); 42 threadA.join(); 43 threadB.join(); 44 auto stopTime = chrono::high_resolution_clock::now(); 45 auto duration = chrono::duration_cast<chrono::milliseconds>(stopTime-startTime).count(); 46 cout<<"sharedData: "<<shared_data<<" 时间:"<<duration<< "ms"<<endl; 47 return 0; 48 49 }读写锁
允许多个读线程同时访问共享资源,但只允许一个写线程独占访问
- 读锁(共享锁):多个线程可以同时持有读锁
- 写锁(独占锁):同一时间只能有一个线程持有写锁,且持有写锁时不能有读锁
// 读写锁的状态转换 // 无锁状态 -> 可以加读锁或写锁 // 有读锁时 -> 可以再加读锁,不能加写锁 // 有写锁时 -> 不能加读锁,也不能加写锁 // 写者优先(Writer-preference) 或防止写饥饿(Write starvation prevention) 的策略。1 #include <iostream> 2 #include <thread> 3 #include <shared_mutex> 4 #include <vector> 5 #include <chrono> 6 7 class ThreadCounter { 8 private: 9 mutable std::shared_mutex mutex_; 10 int value_ = 0; 11 12 public: 13 // 读取操作:使用共享锁 14 int read(int i) const { 15 std::cout<<"读线程调用!"<<std::endl; 16 std::shared_lock<std::shared_mutex> lock(mutex_); // 共享锁 17 std::cout << " (线程ID: " << std::this_thread::get_id() << ") 读操作 顺序号:"<< i << std::end l; 18 std::this_thread::sleep_for(std::chrono::milliseconds(1000)); 19 return value_; 20 } 21 22 // 写入操作:使用独占锁 23 void write(int i) { 24 std::cout<< "写线程调用!"<<std::endl; 25 std::unique_lock<std::shared_mutex> lock(mutex_); // 独占锁 26 std::cout << " (线程ID: " << std::this_thread::get_id() << ") 写操作 顺序号:" << i << std::en dl; 27 std::this_thread::sleep_for(std::chrono::milliseconds(5000)); 28 ++value_; 29 } 30 31 // 写入操作:重置值 32 void reset() { 33 std::unique_lock<std::shared_mutex> lock(mutex_); // 独占锁 34 std::cout << " (线程ID: " << std::this_thread::get_id() << ") 重置操作" << std::endl; 35 std::this_thread::sleep_for(std::chrono::milliseconds(10)); 36 value_ = 0; 37 } 38 }; 39 40 int main() { 41 std::cout << "=== 基本读写锁示例 ===" << std::endl; 42 43 ThreadCounter counter; 44 std::vector<std::thread> threads; 45 46 // 启动多个读线程 47 for (int i = 0; i < 5; ++i) { 48 threads.emplace_back([&counter, i]() { 49 for (int j = 0; j < 3; ++j) { 50 counter.read(i); 51 } 52 }); 53 } 54 55 // 启动写线程 56 threads.emplace_back([&counter]() { 57 for (int i = 0; i < 2; ++i) { 58 counter.write(i); 59 } 60 }); 61 62 for (auto& t : threads) { 63 t.join(); 64 } 65 return 0; 66 }条件变量 condition_variable
条件变量实现多个线程间的同步操作,当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒
典型流程
mutex 条件变量运行状态切换时的同步
condition_variable 等待/唤醒
共享数据
条件
1 #include <iostream> 2 #include <queue> 3 #include <thread> 4 #include <mutex> 5 #include <condition_variable> 6 #include <vector> 7 8 class ProducerConsumer { 9 private: 10 std::queue<int> queue; // 共享资源:缓冲区队列 11 std::mutex mtx; // 互斥锁,保护队列 12 std::condition_variable cv_prod; // 条件变量:控制生产者(当队列满时等待) 13 std::condition_variable cv_cons; // 条件变量:控制消费者(当队列空时等待) 14 size_t capacity; // 缓冲区最大容量 15 16 public: 17 explicit ProducerConsumer(size_t capacity) : capacity(capacity) {} 18 19 // 生产者调用的入队函数 20 void prod(int value) { 21 // 获取锁:保护共享资源 queue 22 std::unique_lock<std::mutex> lock(mtx); 23 24 // 等待判断:如果队列满了,生产者阻塞并释放锁,直到消费者消费后唤醒 25 // 使用 lambda 表达式防止虚假唤醒 26 //wait()的谓词返回true时继续等待,返回false时才退出等待 27 cv_prod.wait(lock, [this]() { return queue.size() < capacity; }); 28 29 // 执行生产 30 queue.push(value); 31 std::cout << "Produced: " << value << " | Queue size: " << queue.size() << std::endl; 32 33 // 唤醒:告诉正在等待的消费者,现在有货了 34 cv_cons.notify_one(); 35 36 // 作用域结束,lock 自动析构并释放锁 37 } 38 39 // 消费者调用的出队函数 40 int cons() { 41 std::unique_lock<std::mutex> lock(mtx); 42 43 // 等待判断:如果队列空了,消费者阻塞并释放锁,直到生产者生产后唤醒 44 cv_cons.wait(lock, [this]() { return !queue.empty(); }); 45 46 // 执行消费 47 int value = queue.front(); 48 queue.pop(); 49 std::cout << "Consumed: " << value << " | Queue size: " << queue.size() << std::endl; 50 51 // 通知:告诉正在等待的生产者,现在有空位了 52 cv_prod.notify_one(); 53 54 return value; 55 } 56 }; 57 58 // --- 测试代码 --- 59 void producer_task(ProducerConsumer& q, int id) { 60 for (int i = 0; i < 5; ++i) { 61 q.prod(id * 100 + i); // 生产数据 62 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时 63 } 64 } 65 66 void consumer_task(ProducerConsumer& q) { 67 for (int i = 0; i < 10; ++i) { 68 q.cons(); // 消费数据 69 std::this_thread::sleep_for(std::chrono::milliseconds(150)); // 模拟消费耗时 70 } 71 } 72 73 int main() { 74 ProducerConsumer q(5); // 缓冲区容量为 3 75 76 // 开启 2 个生产者线程和 1 个消费者线程 77 std::thread p1(producer_task, std::ref(q), 1); 78 std::thread p2(producer_task, std::ref(q), 2); 79 std::thread c1(consumer_task, std::ref(q)); 80 81 p1.join(); 82 p2.join(); 83 c1.join(); 84 85 return 0; 86 }