🌐 AI搜索 & 代理 主页
Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
be03897
New site search
SilasMarvin Jan 10, 2024
9df3528
Working fast site search and vector search
SilasMarvin Jan 13, 2024
f9cb8a1
Cleaned tests and remote fallback working for search and vector_search
SilasMarvin Jan 17, 2024
b04ead6
Clean up vector search
SilasMarvin Jan 17, 2024
44ab0ed
Switched to a transactional version of upsert documents and syncing p…
SilasMarvin Jan 17, 2024
9aaa31b
Working conditional pipeline running on document upsert
SilasMarvin Jan 18, 2024
6979f69
Really good upsert documents
SilasMarvin Jan 18, 2024
c8e1af8
Cleaned up some tests
SilasMarvin Jan 18, 2024
9df12b5
Switching old pipeline to be a pass through for the new multi field p…
SilasMarvin Jan 19, 2024
f75a2ec
Finished pipeline as a pass through and more tests
SilasMarvin Jan 22, 2024
59f4419
Working site search with doc type filtering
SilasMarvin Jan 22, 2024
ec351ff
Working site search with doc type filtering
SilasMarvin Jan 23, 2024
027080f
collection query_builder now a wrapper around collection.vector_search
SilasMarvin Jan 23, 2024
44cc8a0
Verifying on Python and JavaScript
SilasMarvin Jan 24, 2024
6a9fd14
Working with JavaScript and Python
SilasMarvin Jan 25, 2024
099ea60
Cleaned up
SilasMarvin Jan 25, 2024
412fb57
Move MultiFieldPipeline to Pipeline and added batch uploads for docum…
SilasMarvin Jan 25, 2024
9781766
Added SingleFieldPipeline function shoutout to Lev
SilasMarvin Jan 25, 2024
b87a654
Working on fixing query
SilasMarvin Jan 27, 2024
17b81e7
Working recursive query
SilasMarvin Feb 5, 2024
7339cd5
Added smarter chunking and search results table
SilasMarvin Feb 5, 2024
84e621a
Updated deps, added debugger for queries
SilasMarvin Feb 9, 2024
d745fc6
Logging search results done
SilasMarvin Feb 9, 2024
2d75d98
Correct return type with search inserts
SilasMarvin Feb 9, 2024
bed7144
Updated tests to pass with new sqlx version
SilasMarvin Feb 9, 2024
0e06ce1
Added a way for users to provide search_events
SilasMarvin Feb 12, 2024
1677a51
Quick fix on remote embeddings search
SilasMarvin Feb 12, 2024
a5599e5
Quick fix and change the upsert query to be more efficient
SilasMarvin Feb 13, 2024
f47002e
Fix for JS after updating tokio
SilasMarvin Feb 13, 2024
f39b94c
Updated extractive_question_answering example for Python
SilasMarvin Feb 13, 2024
f2c5f61
Updated question_answering for Python
SilasMarvin Feb 13, 2024
6ec6df5
Updated question_answering_instructor for Python
SilasMarvin Feb 13, 2024
c9a24e6
Updated semantic_search for Python
SilasMarvin Feb 14, 2024
6c7f05a
Updated summarizing_question_answering for Python
SilasMarvin Feb 14, 2024
119807f
Updated table question answering for Python
SilasMarvin Feb 14, 2024
71d4915
Updated table question answering for Python
SilasMarvin Feb 14, 2024
6dfd0d7
Updated rag question answering for Python
SilasMarvin Feb 14, 2024
70f1ac0
Updated question_answering for JavaScript
SilasMarvin Feb 14, 2024
67fae04
Updated question_answering_instructor for JavaScript
SilasMarvin Feb 14, 2024
0dd0027
Updated question_answering_instructor for JavaScript
SilasMarvin Feb 14, 2024
7afea01
Updated extractive_question_answering example for JavaScript
SilasMarvin Feb 14, 2024
95188a4
Updated summarizing_question_answering for JavaScript
SilasMarvin Feb 14, 2024
8807489
Updated semantic_search for JavaScript
SilasMarvin Feb 14, 2024
c9e5d04
Updated versions and removed unused clone
SilasMarvin Feb 14, 2024
c71143f
Cleaned up search query
SilasMarvin Feb 14, 2024
f4d261e
Edit test
SilasMarvin Feb 14, 2024
3d1a6ce
Added the stress test
SilasMarvin Feb 14, 2024
692c252
Updated to use new sdk
SilasMarvin Feb 14, 2024
fc5658f
Updated test
SilasMarvin Feb 15, 2024
4c38aca
Removed document_id
SilasMarvin Feb 16, 2024
4167e32
Removed document_id and updated all searches to work without it
SilasMarvin Feb 16, 2024
0cadd8c
Fixed python test
SilasMarvin Feb 16, 2024
077ce1b
Updated stress test
SilasMarvin Feb 16, 2024
7f53b93
Updated to clean up pool access
SilasMarvin Feb 16, 2024
144da42
Added test for bad collection names
SilasMarvin Feb 16, 2024
039c9cc
Cleaned up tests
SilasMarvin Feb 16, 2024
bd983cf
Add migration error
SilasMarvin Feb 26, 2024
4fb0149
Updated text
SilasMarvin Feb 26, 2024
b4f1edd
Add dockerfile to build javascript
SilasMarvin Feb 26, 2024
c41597a
Working dockerfile for build
SilasMarvin Feb 26, 2024
3f53e9c
Test github docker build
SilasMarvin Feb 26, 2024
679b995
Iterating on gh action
SilasMarvin Feb 26, 2024
c614e4e
Iterating on gh action
SilasMarvin Feb 26, 2024
7169596
Iterating on gh action
SilasMarvin Feb 26, 2024
8de7727
Iterating on gh action
SilasMarvin Feb 26, 2024
25fe41c
Iterating on gh action
SilasMarvin Feb 26, 2024
271e1e4
Updated collection test
SilasMarvin Feb 26, 2024
9e4c2a1
Finished boosting and working with the new sdk
SilasMarvin Feb 27, 2024
c46957c
Made document search just use semantic search and boosted title
SilasMarvin Feb 27, 2024
0d963a8
Updated the chatbot to use the new chat history
SilasMarvin Feb 27, 2024
d9b241d
Small cleanups
SilasMarvin Feb 27, 2024
a34619b
Adjust boosting
SilasMarvin Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move MultiFieldPipeline to Pipeline and added batch uploads for docum…
…ents
  • Loading branch information
SilasMarvin committed Feb 28, 2024
commit 412fb571682ac758fb8fe05b04a8dc7fea672daf
3 changes: 1 addition & 2 deletions pgml-sdks/pgml/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ export function newCollection(name: string, database_url?: string): Collection;
export function newModel(name?: string, source?: string, parameters?: Json): Model;
export function newSplitter(name?: string, parameters?: Json): Splitter;
export function newBuiltins(database_url?: string): Builtins;
export function newPipeline(name: string, model?: Model, splitter?: Splitter, parameters?: Json): Pipeline;
export function newMultiFieldPipeline(name: string, schema?: Json): MultiFieldPipeline;
export function newPipeline(name: string, schema?: Json): Pipeline;
export function newTransformerPipeline(task: string, model?: string, args?: Json, database_url?: string): TransformerPipeline;
export function newOpenSourceAI(database_url?: string): OpenSourceAI;
"#;
Expand Down
172 changes: 80 additions & 92 deletions pgml-sdks/pgml/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@ use crate::filter_builder::FilterBuilder;
use crate::search_query_builder::build_search_query;
use crate::vector_search_query_builder::build_vector_search_query;
use crate::{
get_or_initialize_pool, models,
multi_field_pipeline::MultiFieldPipeline,
order_by_builder, queries, query_builder,
get_or_initialize_pool, models, order_by_builder,
pipeline::Pipeline,
queries, query_builder,
query_builder::QueryBuilder,
splitter::Splitter,
types::{DateTime, IntoTableNameAndSchema, Json, SIden, TryToNumeric},
utils,
};

#[cfg(feature = "python")]
use crate::{
multi_field_pipeline::MultiFieldPipelinePython, query_builder::QueryBuilderPython,
types::JsonPython,
};
use crate::{pipeline::PipelinePython, query_builder::QueryBuilderPython, types::JsonPython};

/// Our project tasks
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -238,7 +235,7 @@ impl Collection {
// Splitters table is not unique to a collection or pipeline. It exists in the pgml schema
Splitter::create_splitters_table(&mut transaction).await?;
self.create_documents_table(&mut transaction).await?;
MultiFieldPipeline::create_multi_field_pipelines_table(
Pipeline::create_pipelines_table(
&collection_database_data.project_info,
&mut transaction,
)
Expand Down Expand Up @@ -272,7 +269,7 @@ impl Collection {
/// }
/// ```
#[instrument(skip(self))]
pub async fn add_pipeline(&mut self, pipeline: &mut MultiFieldPipeline) -> anyhow::Result<()> {
pub async fn add_pipeline(&mut self, pipeline: &mut Pipeline) -> anyhow::Result<()> {
// The flow for this function:
// 1. Create collection if it does not exists
// 2. Create the pipeline if it does not exist and add it to the collection.pipelines table with ACTIVE = TRUE
Expand Down Expand Up @@ -314,7 +311,7 @@ impl Collection {
/// }
/// ```
#[instrument(skip(self))]
pub async fn remove_pipeline(&mut self, pipeline: &MultiFieldPipeline) -> anyhow::Result<()> {
pub async fn remove_pipeline(&mut self, pipeline: &Pipeline) -> anyhow::Result<()> {
// The flow for this function:
// 1. Create collection if it does not exist
// 2. Begin a transaction
Expand Down Expand Up @@ -364,10 +361,7 @@ impl Collection {
/// }
/// ```
#[instrument(skip(self))]
pub async fn enable_pipeline(
&mut self,
pipeline: &mut MultiFieldPipeline,
) -> anyhow::Result<()> {
pub async fn enable_pipeline(&mut self, pipeline: &mut Pipeline) -> anyhow::Result<()> {
// The flow for this function:
// 1. Set ACTIVE = TRUE for the pipeline in collection.pipelines
// 2. Resync the pipeline
Expand Down Expand Up @@ -400,7 +394,7 @@ impl Collection {
/// }
/// ```
#[instrument(skip(self))]
pub async fn disable_pipeline(&self, pipeline: &MultiFieldPipeline) -> anyhow::Result<()> {
pub async fn disable_pipeline(&self, pipeline: &Pipeline) -> anyhow::Result<()> {
// The flow for this function:
// 1. Set ACTIVE = FALSE for the pipeline in collection.pipelines
sqlx::query(&query_builder!(
Expand Down Expand Up @@ -464,7 +458,7 @@ impl Collection {
// The flow for this function
// 1. Create the collection if it does not exist
// 2. Get all pipelines where ACTIVE = TRUE
// 4. Foreach document
// 4. Foreach n documents
// -> Begin a transaction returning the old document if it existed
// -> Insert the document
// -> Foreach pipeline check if we need to resync the document and if so sync the document
Expand All @@ -479,88 +473,85 @@ impl Collection {
let progress_bar = utils::default_progress_bar(documents.len() as u64);
progress_bar.println("Upserting Documents...");

for document in documents {
let batch_size = args
.get("batch_size")
.map(TryToNumeric::try_to_u64)
.unwrap_or(Ok(10))?;

for batch in documents.chunks(batch_size as usize) {
let mut transaction = pool.begin().await?;
let id = document
.get("id")
.context("`id` must be a key in document")?
.to_string();
let md5_digest = md5::compute(id.as_bytes());
let source_uuid = uuid::Uuid::from_slice(&md5_digest.0)?;

let query = if args
.get("merge")
.map(|v| v.as_bool().unwrap_or(false))
.unwrap_or(false)
{
query_builder!(
"WITH prev AS (SELECT document FROM %s WHERE source_uuid = $1) INSERT INTO %s (source_uuid, document) VALUES ($1, $2) ON CONFLICT (source_uuid) DO UPDATE SET document = %s.document || EXCLUDED.document RETURNING id, (SELECT document FROM prev)",
self.documents_table_name,
self.documents_table_name,
self.documents_table_name
)
} else {
query_builder!(
"WITH prev AS (SELECT document FROM %s WHERE source_uuid = $1) INSERT INTO %s (source_uuid, document) VALUES ($1, $2) ON CONFLICT (source_uuid) DO UPDATE SET document = EXCLUDED.document RETURNING id, (SELECT document FROM prev)",
self.documents_table_name,
self.documents_table_name
)
};
let (document_id, previous_document): (i64, Option<Json>) = sqlx::query_as(&query)
.bind(source_uuid)
.bind(&document)
.fetch_one(&mut *transaction)
.await?;

let mut dp = vec![];
for document in batch {
let id = document
.get("id")
.context("`id` must be a key in document")?
.to_string();
let md5_digest = md5::compute(id.as_bytes());
let source_uuid = uuid::Uuid::from_slice(&md5_digest.0)?;

let query = if args
.get("merge")
.map(|v| v.as_bool().unwrap_or(false))
.unwrap_or(false)
{
query_builder!(
"WITH prev AS (SELECT document FROM %s WHERE source_uuid = $1) INSERT INTO %s (source_uuid, document) VALUES ($1, $2) ON CONFLICT (source_uuid) DO UPDATE SET document = %s.document || EXCLUDED.document RETURNING id, (SELECT document FROM prev)",
self.documents_table_name,
self.documents_table_name,
self.documents_table_name
)
} else {
query_builder!(
"WITH prev AS (SELECT document FROM %s WHERE source_uuid = $1) INSERT INTO %s (source_uuid, document) VALUES ($1, $2) ON CONFLICT (source_uuid) DO UPDATE SET document = EXCLUDED.document RETURNING id, (SELECT document FROM prev)",
self.documents_table_name,
self.documents_table_name
)
};
let (document_id, previous_document): (i64, Option<Json>) = sqlx::query_as(&query)
.bind(source_uuid)
.bind(document)
.fetch_one(&mut *transaction)
.await?;
dp.push((document_id, document, previous_document));
}

let transaction = Arc::new(Mutex::new(transaction));
if !pipelines.is_empty() {
use futures::stream::StreamExt;
futures::stream::iter(&mut pipelines)
// Need this map to get around moving the transaction
.map(|pipeline| {
(
pipeline,
previous_document.clone(),
document.clone(),
transaction.clone(),
)
.map(|pipeline| (pipeline, dp.clone(), transaction.clone()))
.for_each_concurrent(10, |(pipeline, db, transaction)| async move {
let parsed_schema = pipeline
.get_parsed_schema()
.await
.expect("Error getting parsed schema for pipeline");
let ids_to_run_on: Vec<i64> = db
.into_iter()
.filter(|(_, document, previous_document)| match previous_document {
Some(previous_document) => parsed_schema
.iter()
.any(|(key, _)| document[key] != previous_document[key]),
None => true,
})
.map(|(document_id, _, _)| document_id)
.collect();
pipeline
.sync_documents(ids_to_run_on, transaction)
.await
.expect("Failed to execute pipeline");
})
.for_each_concurrent(
10,
|(pipeline, previous_document, document, transaction)| async move {
match previous_document {
Some(previous_document) => {
// Can unwrap here as we know it has parsed schema from the create_table call above
let should_run =
pipeline.parsed_schema.as_ref().unwrap().iter().any(
|(key, _)| document[key] != previous_document[key],
);
if should_run {
pipeline
.sync_document(document_id, transaction)
.await
.expect("Failed to execute pipeline");
}
}
None => {
pipeline
.sync_document(document_id, transaction)
.await
.expect("Failed to execute pipeline");
}
}
},
)
.await;
}

Arc::into_inner(transaction)
.context("Error transaction dangling")?
.into_inner()
.commit()
.await?;
progress_bar.inc(1);
}

progress_bar.println("Done Upserting Documents\n");
progress_bar.finish();
Ok(())
Expand Down Expand Up @@ -679,7 +670,7 @@ impl Collection {
pub async fn search(
&mut self,
query: Json,
pipeline: &mut MultiFieldPipeline,
pipeline: &mut Pipeline,
) -> anyhow::Result<Vec<Json>> {
let pool = get_or_initialize_pool(&self.database_url).await?;
let (built_query, values) = build_search_query(self, query.clone(), pipeline).await?;
Expand Down Expand Up @@ -719,7 +710,7 @@ impl Collection {
pub async fn search_local(
&self,
query: Json,
pipeline: &MultiFieldPipeline,
pipeline: &Pipeline,
) -> anyhow::Result<Vec<Json>> {
let pool = get_or_initialize_pool(&self.database_url).await?;
let (built_query, values) = build_search_query(self, query.clone(), pipeline).await?;
Expand Down Expand Up @@ -753,7 +744,7 @@ impl Collection {
pub async fn vector_search(
&mut self,
query: Json,
pipeline: &mut MultiFieldPipeline,
pipeline: &mut Pipeline,
) -> anyhow::Result<Vec<Json>> {
let pool = get_or_initialize_pool(&self.database_url).await?;

Expand Down Expand Up @@ -869,7 +860,7 @@ impl Collection {
/// }
/// ```
#[instrument(skip(self))]
pub async fn get_pipelines(&mut self) -> anyhow::Result<Vec<MultiFieldPipeline>> {
pub async fn get_pipelines(&mut self) -> anyhow::Result<Vec<Pipeline>> {
self.verify_in_database(false).await?;
let project_info = &self
.database_data
Expand All @@ -887,7 +878,7 @@ impl Collection {
pipelines
.into_iter()
.map(|p| {
let mut p: MultiFieldPipeline = p.try_into()?;
let mut p: Pipeline = p.try_into()?;
p.set_project_info(project_info.clone());
Ok(p)
})
Expand All @@ -908,7 +899,7 @@ impl Collection {
/// }
/// ```
#[instrument(skip(self))]
pub async fn get_pipeline(&mut self, name: &str) -> anyhow::Result<MultiFieldPipeline> {
pub async fn get_pipeline(&mut self, name: &str) -> anyhow::Result<Pipeline> {
self.verify_in_database(false).await?;
let project_info = &self
.database_data
Expand All @@ -923,7 +914,7 @@ impl Collection {
.bind(name)
.fetch_one(&pool)
.await?;
let mut pipeline: MultiFieldPipeline = pipeline.try_into()?;
let mut pipeline: Pipeline = pipeline.try_into()?;
pipeline.set_project_info(project_info.clone());
Ok(pipeline)
}
Expand Down Expand Up @@ -1039,10 +1030,7 @@ impl Collection {
Ok(())
}

pub async fn generate_er_diagram(
&mut self,
pipeline: &mut MultiFieldPipeline,
) -> anyhow::Result<String> {
pub async fn generate_er_diagram(&mut self, pipeline: &mut Pipeline) -> anyhow::Result<String> {
self.verify_in_database(false).await?;
pipeline.verify_in_database(false).await?;

Expand Down
Loading