Dify如何解决Weaviate数据库 1.19.0 到 1.27.0+ 版本之间架构不兼容问题(附迁移脚本)

熊猫办公

字数 2834,阅读大约需 15 分钟

Dify数据库迁移,用到了migrate_weaviate_collections.py脚本。

这个脚本中做了哪些工作,才解决版本不兼容问题?如何解决不兼容问题?先来看看脚本中每个函数的详细说明,然后了解整体架构。了解脚本中的环境变量配置和解决方案步骤。最后再看执行日志会做哪些内容。函数详解

  1. 1.identify_old_collections(client: weaviate.WeaviateClient) -> List[str]
    • 作用:识别需要迁移的旧集合(即没有vectorConfig的集合)。
    • 实现
      • • 列出所有集合,检查是否以Vector_index_开头。
      • • 如果集合的配置中没有vectorConfig,则将其添加到需要迁移的集合列表中。
  2. 2.get_collection_schema(client: weaviate.WeaviateClient, collection_name: str) -> Dict[str, Any]
    • 作用:通过 REST API 获取指定集合的完整架构。
    • 实现
      • • 发送 GET 请求到 Weaviate 的架构 API,返回集合的 JSON 架构。
  3. 3.create_new_collection(client: weaviate.WeaviateClient, old_name: str, schema: Dict[str, Any]) -> str
    • 作用:创建一个新的集合,使用更新后的架构。
    • 实现
      • • 构建新的集合名称(在旧名称后加_migrated)。
      • • 生成新的架构,确保包含正确的vectorConfig
      • • 通过 REST API 创建新集合,并返回新集合的名称。
  4. 4.migrate_collection_data(client: weaviate.WeaviateClient, old_collection_name: str, new_collection_name: str) -> int
    • 作用:将数据从旧集合迁移到新集合,使用基于游标的分页方式。
    • 实现
      • • 使用游标分页获取旧集合中的对象,并将其批量插入到新集合中。
      • • 返回迁移的对象总数。
  5. 5.verify_migration(client: weaviate.WeaviateClient, old_collection_name: str, new_collection_name: str)
    • 作用:验证迁移是否成功。
    • 实现
      • • 查询旧集合和新集合中的对象数量,并进行比较。
      • • 输出验证结果。
  6. 6.replace_old_collection(client: weaviate.WeaviateClient, old_collection_name: str, new_collection_name: str)
    • 作用:用新的集合替换旧的集合。
    • 实现
      • • 删除旧集合。
      • • 获取新集合的架构,并将其 class 名称更改为旧集合的名称。
      • • 创建一个新的集合(与旧集合同名),并复制数据。
      • • 删除临时的新集合。
  7. 7.migrate_all_collections()
    • 作用:整体迁移的主入口函数。
    • 实现
      • • 关闭客户端连接。

整体架构该脚本的整体架构如下:

Dify如何解决Weaviate数据库 1.19.0 到 1.27.0+ 版本之间架构不兼容问题(附迁移脚本)
环境变量配置在进入具体解决方案步骤之前,先使用环境变量获取 Weaviate 的连接信息,包括WEAVIATE_ENDPOINTWEAVIATE_GRPC_ENDPOINTWEAVIATE_API_KEY。解决方案步骤

  1. 1.识别旧集合
    • • 脚本首先使用identify_old_collections函数识别所有需要迁移的旧集合。这些集合的特征是缺少vectorConfig,这是导致架构不兼容的主要原因。
  2. 2.获取旧集合架构
    • • 对于每个识别出的旧集合,脚本通过get_collection_schema函数获取其完整的架构信息。这一步确保在迁移过程中可以正确复制旧集合的属性。
  3. 3.创建新集合
    • • 使用create_new_collection函数,脚本根据旧集合的架构创建新集合。在新集合中,脚本确保包含适当的vectorConfig设置,特别是引入了名为 “default” 的命名向量配置。这一变化是解决架构不兼容的关键。
  4. 4.数据迁移
    • • 通过migrate_collection_data函数,脚本将数据从旧集合迁移到新集合。迁移使用基于游标的分页方式,以确保在处理大数据集时的效率和稳定性。这一步骤确保所有对象及其属性都被正确地迁移到新的架构中。
  5. 5.验证迁移结果
    • • 使用verify_migration函数,脚本验证旧集合和新集合中的对象数量是否一致。如果数量匹配,说明迁移成功,这进一步确保了数据的完整性。
  6. 6.替换旧集合
    • • 在确认迁移成功后,脚本通过replace_old_collection函数删除旧集合,并使用新集合的架构和数据替换旧集合。这一步骤确保系统在使用新架构时不会留下任何旧数据。
  7. 7.清理临时集合
    • • 最后,脚本会删除临时创建的新集合,确保系统的整洁和一致性。

小结通过以上步骤,该脚本有效地解决了 Weaviate 架构不兼容的问题,确保了数据的完整性和系统的稳定性。它不仅自动化了迁移过程,还通过验证和替换操作确保了整个迁移的成功。脚本运行日志解读

Dify如何解决Weaviate数据库 1.19.0 到 1.27.0+ 版本之间架构不兼容问题(附迁移脚本)
  1. 1.迁移开始
    • • 日志显示正在迁移集合Vector_index_fe5e3eb6_3e1a_4bcd_b2bc_c2e78a290ebd_Node
  2. 2.创建新集合
    • • 创建了一个新的集合Vector_index_fe5e3eb6_3e1a_4bcd_b2bc_c2e78a290ebd_Node_migrated
    • • 成功创建后,日志确认新集合已创建。
  3. 3.数据迁移
    • • 从旧集合Vector_index_fe5e3eb6_3e1a_4bcd_b2bc_c2e78a290ebd_Node向新集合迁移数据。
    • • 成功迁移了 130 个对象,并记录了迁移的总数。
  4. 4.验证迁移
    • • 验证迁移结果:
      • • 旧集合中的对象数量为 130。
      • • 新集合中的对象数量也为 130。
    • • 由于数量匹配,确认迁移状态为成功。
  5. 5.替换旧集合
    • • 开始用迁移后的数据替换旧集合:
      • 步骤 1:删除旧集合,成功删除。
      • 步骤 2:从迁移后的集合获取架构。
      • 步骤 3:使用原始名称创建新集合,成功创建。
      • 步骤 4:将数据从迁移后的集合复制到新创建的集合中,成功复制 130 个对象。
      • 步骤 5:清理临时迁移集合,成功清理。
  6. 6.迁移成功
    • • 最终确认Vector_index_fe5e3eb6_3e1a_4bcd_b2bc_c2e78a290ebd_Node现在拥有新的架构,并包含 130 个对象。

日志的最后一行显示整个迁移过程已完成,共迁移了 335 个集合。整个过程包括创建新集合、迁移数据、验证、替换旧集合以及清理临时集合,所有步骤均成功执行。

附:迁移脚本提供3个版本的迁移脚本,一个是Dify官方的,一个是个人git账户备份的,最后是无法科学上网的朋友,可以直接复制的。各取所需,能直接wget就wget。
Dify的Weaviate迁移脚本[1]
备份的Weaviate迁移脚本[2]"""
Migration script to fix Weaviate schema incompatibility between 1.19.0 and 1.27.0+
This script:
- Identifies collections with old schema (no vectorConfig)
- Creates new collections with proper vectorConfig including "default" named vector
- Migrates data using cursor-based pagination (efficient for large datasets)
- Uses batch operations for fast inserts
- Preserves all object properties and vectors
Note:
- This is a community-edited version of the draft of the script presented by the Dify Team.
- This script is not officially supported by the Dify Team.
- The original source for this script can be found at https://github.com/langgenius/dify/issues/27291#issuecomment-3501003678.
- The changes made in this script are:
- Retrieve Weaviate connection info from environment variables to make this script run in the Worker container.
- Switch to cursor-based pagination in "replace_old_collection", since the migration could fail with large collections.
- Fix an issue where both the old and new collections remained without being deleted after migrating an empty collection.
"""

importos
importweaviate
fromweaviate.classes.configimportConfigure, VectorDistances
importsys
importtime
fromtypingimportList,Dict,Any

# Configuration
WEAVIATE_ENDPOINT = os.getenv("WEAVIATE_ENDPOINT","http://weaviate:8080")
WEAVIATE_GRPC_ENDPOINT = os.getenv("WEAVIATE_GRPC_ENDPOINT","grpc://weaviate:50051")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY","WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih")
BATCH_SIZE =1000
WEAVIATE_HOST = WEAVIATE_ENDPOINT.split("//")[-1].split(":")[0]
WEAVIATE_PORT =int(WEAVIATE_ENDPOINT.split(":")[-1])
WEAVIATE_GRPC_PORT =int(WEAVIATE_GRPC_ENDPOINT.split(":")[-1])

defidentify_old_collections(client: weaviate.WeaviateClient) ->List[str]:
"""Identify collections that need migration (those without vectorConfig)"""
collections_to_migrate = []

all_collections = client.collections.list_all()
print(f"Found{len(all_collections)}total collections")

forcollection_nameinall_collections.keys():
# Only check Vector_index collections (Dify knowledge bases)
ifnotcollection_name.startswith("Vector_index_"):
continue

collection = client.collections.get(collection_name)
config = collection.config.get()

# Check if this collection has the old schema
ifconfig.vector_configisNone:
collections_to_migrate.append(collection_name)
print(f" -{collection_name}: OLD SCHEMA (needs migration)")
else:
print(f" -{collection_name}: NEW SCHEMA (skip)")

returncollections_to_migrate

defget_collection_schema(
client: weaviate.WeaviateClient, collection_name:str
) ->Dict[str,Any]:
"""Get the full schema of a collection via REST API"""
importrequests

response = requests.get(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{collection_name}",
headers={"Authorization":f"Bearer{WEAVIATE_API_KEY}"},
)

ifresponse.status_code ==200:
returnresponse.json()
else:
raiseException(f"Failed to get schema:{response.text}")

defcreate_new_collection(
client: weaviate.WeaviateClient, old_name:str, schema:Dict[str,Any]
) ->str:
"""Create a new collection with updated schema using REST API"""
importrequests

# Generate new collection name
new_name =f"{old_name}_migrated"

print(f"Creating new collection:{new_name}")

# Build new schema with proper vectorConfig
# Note: When using vectorConfig (named vectors), we don't set class-level vectorizer
new_schema = {
"class": new_name,
# This is the key: define vectorConfig with "default" named vector
# Do NOT set class-level vectorizer when using vectorConfig
"vectorConfig": {
"default": {
"vectorizer": {"none": {}},
"vectorIndexType":"hnsw",
"vectorIndexConfig": {
"distance":"cosine",
"ef": -1,
"efConstruction":128,
"maxConnections":32,
},
}
},
"properties": [],
}

# Copy properties from old schema
if"properties"inschema:
new_schema["properties"] = schema["properties"]

# Create collection via REST API
response = requests.post(
f"{WEAVIATE_ENDPOINT}/v1/schema",
json=new_schema,
headers={"Authorization":f"Bearer{WEAVIATE_API_KEY}"},
)

ifresponse.status_codenotin[200,201]:
raiseException(f"Failed to create collection:{response.text}")

print(f" Created new collection:{new_name}")
returnnew_name

defmigrate_collection_data(
client: weaviate.WeaviateClient, old_collection_name:str, new_collection_name:str
) ->int:
"""Migrate data from old collection to new collection using cursor-based pagination"""

old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)

total_migrated =0
cursor =None

print(f"Migrating data from{old_collection_name}to{new_collection_name}")

whileTrue:
# Fetch batch of objects using cursor-based pagination
ifcursorisNone:
# First batch
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE, include_vector=True
)
else:
# Subsequent batches using cursor
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE, include_vector=True, after=cursor
)

objects = response.objects

ifnotobjects:
break

# Use batch insert for efficiency
withnew_collection.batch.dynamic()asbatch:
forobjinobjects:
# Prepare properties
properties = obj.properties

# Add object with vector
batch.add_object(
properties=properties,
vector=(
obj.vector["default"]
ifisinstance(obj.vector,dict)
elseobj.vector
),
uuid=obj.uuid,
)

total_migrated +=len(objects)
print(f" Migrated{total_migrated}objects...")

# Update cursor for next iteration
iflen(objects) < BATCH_SIZE:
# Last batch
break
else:
# Get the last object's UUID for cursor
cursor = objects[-1].uuid

print(f" Total migrated:{total_migrated}objects")
returntotal_migrated

defverify_migration(
client: weaviate.WeaviateClient, old_collection_name:str, new_collection_name:str
):
"""Verify that the migration was successful"""

old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)

# Count objects in both collections
old_count_response = old_collection.query.fetch_objects(limit=1)
new_count_response = new_collection.query.fetch_objects(limit=1)

# Get aggregation for accurate counts
old_agg = old_collection.aggregate.over_all(total_count=True)
new_agg = new_collection.aggregate.over_all(total_count=True)

old_count = old_agg.total_count
new_count = new_agg.total_count

print(f"
Verification:")
print(f" Old collection ({old_collection_name}):{old_count}objects")
print(f" New collection ({new_collection_name}):{new_count}objects")

ifold_count == new_count:
print(f" Status: SUCCESS - Counts match!")
returnTrue
else:
print(f" Status: WARNING - Counts don't match!")
returnFalse

defreplace_old_collection(
client: weaviate.WeaviateClient, old_collection_name:str, new_collection_name:str
):
"""Replace old collection with migrated one by recreating with original name"""
importrequests

print(f"
Replacing old collection with migrated data...")

# Step 1: Delete old collection
print(f" Step 1: Deleting old collection...")
response = requests.delete(
f"{WEAVIATE_ENDPOINT}/v1/schema/{old_collection_name}",
headers={"Authorization":f"Bearer{WEAVIATE_API_KEY}"},
)
ifresponse.status_code !=200:
print(f" Warning: Could not delete old collection:{response.text}")
else:
print(f" Deleted")

# Step 2: Get schema from migrated collection
print(f" Step 2: Getting schema from migrated collection...")
schema_response = requests.get(
f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}",
headers={"Authorization":f"Bearer{WEAVIATE_API_KEY}"},
)
schema = schema_response.json()
schema["class"] = old_collection_name

# Step 3: Create collection with original name and new schema
print(f" Step 3: Creating collection with original name...")
create_response = requests.post(
f"{WEAVIATE_ENDPOINT}/v1/schema",
json=schema,
headers={"Authorization":f"Bearer{WEAVIATE_API_KEY}"},
)
ifcreate_response.status_codenotin[200,201]:
raiseException(f"Failed to create collection:{create_response.text}")
print(f" Created")

# Step 4: Copy data to collection with original name using cursor-based pagination
print(f" Step 4: Copying data to original collection name...")
migrated_collection = client.collections.get(new_collection_name)
new_collection = client.collections.get(old_collection_name)

total_copied =0
cursor =None

whileTrue:
# Fetch batch of objects using cursor-based pagination
ifcursorisNone:
# First batch
response = migrated_collection.query.fetch_objects(
include_vector=True, limit=BATCH_SIZE
)
else:
# Subsequent batches using cursor
response = migrated_collection.query.fetch_objects(
include_vector=True, limit=BATCH_SIZE, after=cursor
)

objects = response.objects

ifnotobjects:
break

# Use batch insert for efficiency
withnew_collection.batch.dynamic()asbatch:
forobjinobjects:
batch.add_object(
properties=obj.properties, vector=obj.vector, uuid=obj.uuid
)

total_copied +=len(objects)
print(f" Copied{total_copied}objects...")

# Update cursor for next iteration
iflen(objects) < BATCH_SIZE:
break
else:
cursor = objects[-1].uuid

print(f" Total copied:{total_copied}objects")

# Step 5: Delete the temporary migrated collection
print(f" Step 5: Cleaning up temporary migrated collection...")
response = requests.delete(
f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}",
headers={"Authorization":f"Bearer{WEAVIATE_API_KEY}"},
)
ifresponse.status_code ==200:
print(f" Cleaned up")

print(
f"
SUCCESS!{old_collection_name}now has the new schema with{total_copied}objects"
)
returnTrue

defmigrate_all_collections():
"""Main migration function"""

print("="*80)
print("Weaviate Collection Migration Script")
print("Migrating from Weaviate 1.19.0 schema to 1.27.0+ schema")
print("="*80)
print()

client = weaviate.connect_to_local(
host=WEAVIATE_HOST,
port=WEAVIATE_PORT,
grpc_port=WEAVIATE_GRPC_PORT,
auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_API_KEY),
)

try:
# Step 1: Identify collections that need migration
print("Step 1: Identifying collections that need migration...")
collections_to_migrate = identify_old_collections(client)

ifnotcollections_to_migrate:
print("
No collections need migration. All collections are up to date!")
return

print(f"
Found{len(collections_to_migrate)}collections to migrate:")
forcolincollections_to_migrate:
print(f" -{col}")

# Confirm before proceeding
print("
This script will:")
print("1. Create new collections with updated schema")
print("2. Copy all data using efficient batch operations")
print("3. Verify the migration")
print("4. Optionally rename collections to activate the new ones")
print()

# Step 2: Migrate each collection
forcollection_nameincollections_to_migrate:
print("
"+"="*80)
print(f"Migrating:{collection_name}")
print("="*80)

try:
# Get old schema
schema = get_collection_schema(client, collection_name)

# Create new collection
new_collection_name = create_new_collection(
client, collection_name, schema
)

# Migrate data
migrated_count = migrate_collection_data(
client, collection_name, new_collection_name
)

# Verify migration
success = verify_migration(client, collection_name, new_collection_name)

ifsuccess:
print(f"
Migration successful for{collection_name}!")
print(f"New collection:{new_collection_name}")

# Automatically replace old collection with migrated one
try:
replace_old_collection(
client, collection_name, new_collection_name
)
exceptExceptionase:
print(
f"
Warning: Could not automatically replace collection:{e}"
)
print(f"
To activate manually:")
print(f"1. Delete the old collection:{collection_name}")
print(f"2. Rename{new_collection_name}to{collection_name}")

exceptExceptionase:
print(f"
Error migrating{collection_name}:{e}")
print(f"Skipping this collection and continuing...")
continue

print("
"+"="*80)
print("Migration Complete!")
print("="*80)
print("
Summary:")
print(f" Collections migrated:{len(collections_to_migrate)}")

finally:
client.close()

if__name__ =="__main__":
try:
migrate_all_collections()
exceptKeyboardInterrupt:
print("

Migration interrupted by user.")
sys.exit(1)
exceptExceptionase:
print(f"

Fatal error:{e}")
importtraceback

traceback.print_exc()
sys.exit(1)

引用链接[1]Dify的Weaviate迁移脚本:https://github.com/langgenius/dify-docs/blob/main/assets/migrate_weaviate_collections.py
[2]Weaviate 迁移指南.md:https://github.com/LGRY/AI-Workflow-Hub/blob/main/Dify/Weaviate%20%E8%BF%81%E7%A7%BB%E6%8C%87%E5%8D%97.md

实践出真知,与君共勉

Dify如何解决Weaviate数据库 1.19.0 到 1.27.0+ 版本之间架构不兼容问题(附迁移脚本)
Dify如何解决Weaviate数据库 1.19.0 到 1.27.0+ 版本之间架构不兼容问题(附迁移脚本)
点击下方卡片 关注我们
Dify如何解决Weaviate数据库 1.19.0 到 1.27.0+ 版本之间架构不兼容问题(附迁移脚本)
© 版权声明

相关文章