Redis 缓存一致性指的是缓存数据与数据库数据保持同步,避免出现缓存数据过时、与数据库数据不匹配的情况。
策略 | 核心思想 | 一致性强度 | 性能影响 | 实现复杂度 | 适用场景 |
---|---|---|---|---|---|
Cache-Aside | 应用层主动管理缓存:读时延后加载,写时更新DB并删除缓存 | 最终一致性 | 读操作可能延迟 | 低 | 最常用,读多写少,如用户信息、商品信息 |
Write-Through | 将缓存作为主要数据入口,所有写操作同步更新缓存和数据库 | 强一致性 | 写性能较低 | 中 | 写多读少,对一致性要求极高,如金融账户 |
Write-Behind | 写操作先更新缓存,随后异步批量更新数据库 | 最终一致性 | 写性能高 | 高 | 写操作密集,可容忍数据丢失,如日志、统计 |
Read-Through | 应用只读缓存,由缓存组件自身负责未命中时从数据库加载 | 取决于背后的写策略 | 读操作平均延迟低 | 中 | 希望简化应用逻辑,有相应缓存组件支持 |
1、Cache-Aside (旁路缓存) - 最常用模式
这是最广泛采用的模式,由应用程序显式地管理缓存和数据库的交互。
读流程:
- 接收读请求。
- 首先尝试从 Redis 中读取数据。
- 如果命中(Cache Hit),直接返回数据。
- 如果未命中(Cache Miss),则从数据库查询。
- 将数据库返回的数据写入 Redis(称为“回填”),然后返回响应。
写流程:
- 接收写请求。
- 更新数据库。
删除
(而非更新)Redis 中对应的缓存键。
为何删除缓存,而不是更新它?
在并发场景下,如果采用更新缓存的方式,可能会因为操作时序问题导致缓存中被写入旧数据。
删除缓存
是一种更安全、更简单的策略,它确保后续的读请求能从数据库加载最新数据并重新回填缓存。优点
:实现简单,对业务代码侵入性低,缓存中只保留真正被请求的热点数据。缺点
:在极端的并发情况下,仍可能出现短时间的数据不一致(可通过“延迟双删
”策略优化)。代码示例
import redis import pymysql import json import time # 初始化Redis和数据库连接 r = redis.Redis(host='localhost', port=6379, db=0) db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb') def get_product(product_id): """读取产品信息""" cache_key = f"product:{product_id}" # 1. 尝试从缓存获取 product_data = r.get(cache_key) if product_data: print(f"Cache hit for product {product_id}") return json.loads(product_data) print(f"Cache miss for product {product_id}") # 2. 缓存未命中,查询数据库 cursor = db.cursor() cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,)) product = cursor.fetchone() cursor.close() if not product: # 3. 数据库也没有,缓存空值防止穿透 r.setex(cache_key, 300, json.dumps({"status": "not_found"})) # 5分钟空值缓存 return None # 4. 数据库有数据,写入缓存 product_dict = { 'id': product[0], 'name': product[1], 'price': float(product[2]), 'stock': product[3] } r.setex(cache_key, 3600, json.dumps(product_dict)) # 缓存1小时 return product_dict def update_product(product_id, new_data): """更新产品信息""" cache_key = f"product:{product_id}" # 1. 更新数据库 cursor = db.cursor() cursor.execute( "UPDATE products SET name = %s, price = %s, stock = %s WHERE id = %s", (new_data['name'], new_data['price'], new_data['stock'], product_id) ) db.commit() cursor.close() # 2. 删除缓存 r.delete(cache_key) print(f"Cache invalidated for product {product_id}") # 3. 可选:预热缓存 # get_product(product_id) # 立即重新加载到缓存
缓存双删代码示例
这第二次删除是为了清除在“更新数据库”到“第一次删除缓存”这个极短时间窗口内,可能被其他读请求回填到缓存中的旧数据。
import threading def update_product_with_double_delete(product_id, new_data): """使用延迟双删策略更新产品""" cache_key = f"product:{product_id}" # 第一次删除缓存 r.delete(cache_key) print(f"First cache deletion for product {product_id}") # 更新数据库 cursor = db.cursor() cursor.execute( "UPDATE products SET name = %s, price = %s, stock = %s WHERE id = %s", (new_data['name'], new_data['price'], new_data['stock'], product_id) ) db.commit() cursor.close() # 延迟第二次删除 def delayed_delete(): time.sleep(0.5) # 延迟500ms r.delete(cache_key) print(f"Second cache deletion for product {product_id}") threading.Thread(target=delayed_delete).start()
2、Write-Through (写穿透) - 强一致性模式
在此模式下,缓存被视为主要的数据源。所有写操作都同步地经过缓存,由缓存组件负责同时更新缓存和数据库。
写流程:
- 应用将数据写入缓存。
- 缓存组件同步地将数据写入底层数据库。
- 写入成功后返回。
优点
:保证了强一致性,读写操作都面对缓存,性能通常优于直接读数据库。缺点
:写延迟较高(因为需要等待两个写操作完成),且需要缓存组件或中间件支持此模式。示例代码
class WriteThroughCache: def __init__(self): self.redis = redis.Redis(host='localhost', port=6379, db=0) self.db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb') def set(self, key, value, ttl=None): """写入数据(同时更新缓存和数据库)""" # 1. 更新数据库 self._update_db(key, value) # 2. 更新缓存 if ttl: self.redis.setex(key, ttl, json.dumps(value)) else: self.redis.set(key, json.dumps(value)) def get(self, key): """读取数据""" # 直接从缓存读取 data = self.redis.get(key) if data: return json.loads(data) return None def _update_db(self, key, value): """更新数据库(根据key类型处理)""" if key.startswith("product:"): product_id = key.split(":")[1] cursor = self.db.cursor() cursor.execute( "UPDATE products SET name = %s, price = %s, stock = %s WHERE id = %s", (value['name'], value['price'], value['stock'], product_id) ) self.db.commit() cursor.close() # 使用示例 cache = WriteThroughCache() product_data = {'id': 123, 'name': 'New Product', 'price': 29.99, 'stock': 100} cache.set("product:123", product_data, ttl=3600) # 读取 cached_product = cache.get("product:123")
3、Write-Behind (写回) - 高性能写模式
与 Write-Through 类似,写操作首先更新缓存。但不同的是,对数据库的更新是异步批量进行的。
写流程:
- 应用将数据写入缓存,写入成功后立即返回。
- 缓存组件在后台异步地、批量地将一段时间内累积的写操作刷新到数据库。
优点
:写性能极高,非常适合写操作非常密集的场景。缺点
:有数据丢失的风险(如果缓存宕机,尚未持久化到数据库的数据就会丢失),只能保证最终一致性。实现复杂度最高。示例代码
class WriteBehindCache: def __init__(self): self.redis = redis.Redis(host='localhost', port=6379, db=0) self.db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb') self.write_queue = [] self.batch_size = 100 self.batch_interval = 10 # 秒 self.running = True self.lock = threading.Lock() # 启动后台批处理线程 self.batch_thread = threading.Thread(target=self._batch_writer) self.batch_thread.daemon = True self.batch_thread.start() def set(self, key, value): """写入数据(先更新缓存,异步更新数据库)""" # 1. 立即更新缓存 self.redis.set(key, json.dumps(value)) # 2. 将数据库更新加入队列 with self.lock: self.write_queue.append((key, value)) def get(self, key): """读取数据""" data = self.redis.get(key) if data: return json.loads(data) return None def _batch_writer(self): """后台批处理线程""" while self.running: time.sleep(self.batch_interval) if not self.write_queue: continue # 获取当前批量的数据 with self.lock: batch = self.write_queue[:self.batch_size] self.write_queue = self.write_queue[self.batch_size:] if not batch: continue try: # 批量更新数据库 cursor = self.db.cursor() for key, value in batch: if key.startswith("product:"): product_id = key.split(":")[1] cursor.execute( "INSERT INTO products (id, name, price, stock) VALUES (%s, %s, %s, %s) " "ON DUPLICATE KEY UPDATE name=%s, price=%s, stock=%s", (product_id, value['name'], value['price'], value['stock'], value['name'], value['price'], value['stock']) ) self.db.commit() cursor.close() print(f"Batch updated {len(batch)} items to database") except Exception as e: print(f"Batch update failed: {e}") # 将失败的任务重新加入队列 with self.lock: self.write_queue = batch + self.write_queue def stop(self): """停止服务""" self.running = False self.batch_thread.join() # 处理剩余队列 self._batch_writer() # 使用示例 cache = WriteBehindCache() # 高频率写入 for i in range(1000): product_data = {'id': i, 'name': f'Product {i}', 'price': i*1.1, 'stock': i%100} cache.set(f"product:{i}", product_data) # 停止服务时 cache.stop()
4、 Read-Through (读穿透)
此模式是 Cache-Aside 的变体,旨在简化应用逻辑。应用只从缓存读数据,当缓存未命中时,由缓存组件自身(而不是应用程序)负责从数据库加载数据并回填,然后返回给应用。
读流程:
- 应用向缓存请求数据。
- 如果缓存命中,直接返回。
- 如果缓存未命中,缓存组件从数据库加载数据,存入缓存,然后返回。
优点
:将缓存逻辑从应用中解耦,应用代码更简洁。缺点
:需要缓存组件(或智能客户端)支持此功能。示例代码
class ReadThroughCache: def __init__(self): self.redis = redis.Redis(host='localhost', port=6379, db=0) self.db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb') def get(self, key): """读取数据(缓存未命中时自动加载)""" # 1. 尝试从缓存获取 data = self.redis.get(key) if data: return json.loads(data) # 2. 缓存未命中,从数据库加载 value = self._load_from_db(key) if value is None: # 缓存空值防止穿透 self.redis.setex(key, 300, json.dumps({"status": "not_found"})) return None # 3. 写入缓存 self.redis.setex(key, 3600, json.dumps(value)) return value def _load_from_db(self, key): """从数据库加载数据""" if key.startswith("product:"): product_id = key.split(":")[1] cursor = self.db.cursor() cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,)) product = cursor.fetchone() cursor.close() if product: return { 'id': product[0], 'name': product[1], 'price': float(product[2]), 'stock': product[3] } return None # 使用示例 cache = ReadThroughCache() product = cache.get("product:123")
这一切,似未曾拥有