@@ -92,6 +92,10 @@ typedef struct SubOpts
9292} SubOpts ;
9393
9494static List * fetch_table_list (WalReceiverConn * wrconn , List * publications );
95+ static void check_publications_origin (WalReceiverConn * wrconn ,
96+ List * publications , bool copydata ,
97+ char * origin , Oid * subrel_local_oids ,
98+ int subrel_count , char * subname );
9599static void check_duplicates_in_publist (List * publist , Datum * datums );
96100static List * merge_publications (List * oldpublist , List * newpublist , bool addpub , const char * subname );
97101static void ReportSlotConnectionError (List * rstates , Oid subid , char * slotname , char * err );
@@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
680684 PG_TRY ();
681685 {
682686 check_publications (wrconn , publications );
687+ check_publications_origin (wrconn , publications , opts .copy_data ,
688+ opts .origin , NULL , 0 , stmt -> subname );
683689
684690 /*
685691 * Set sync state based on if we were asked to do data copy or
@@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
786792 ListCell * lc ;
787793 int off ;
788794 int remove_rel_len ;
795+ int subrel_count ;
789796 Relation rel = NULL ;
790797 typedef struct SubRemoveRels
791798 {
@@ -815,28 +822,33 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
815822
816823 /* Get local table list. */
817824 subrel_states = GetSubscriptionRelations (sub -> oid , false);
825+ subrel_count = list_length (subrel_states );
818826
819827 /*
820828 * Build qsorted array of local table oids for faster lookup. This can
821829 * potentially contain all tables in the database so speed of lookup
822830 * is important.
823831 */
824- subrel_local_oids = palloc (list_length ( subrel_states ) * sizeof (Oid ));
832+ subrel_local_oids = palloc (subrel_count * sizeof (Oid ));
825833 off = 0 ;
826834 foreach (lc , subrel_states )
827835 {
828836 SubscriptionRelState * relstate = (SubscriptionRelState * ) lfirst (lc );
829837
830838 subrel_local_oids [off ++ ] = relstate -> relid ;
831839 }
832- qsort (subrel_local_oids , list_length ( subrel_states ) ,
840+ qsort (subrel_local_oids , subrel_count ,
833841 sizeof (Oid ), oid_cmp );
834842
843+ check_publications_origin (wrconn , sub -> publications , copy_data ,
844+ sub -> origin , subrel_local_oids ,
845+ subrel_count , sub -> name );
846+
835847 /*
836848 * Rels that we want to remove from subscription and drop any slots
837849 * and origins corresponding to them.
838850 */
839- sub_remove_rels = palloc (list_length ( subrel_states ) * sizeof (SubRemoveRels ));
851+ sub_remove_rels = palloc (subrel_count * sizeof (SubRemoveRels ));
840852
841853 /*
842854 * Walk over the remote tables and try to match them to locally known
@@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
862874 pubrel_local_oids [off ++ ] = relid ;
863875
864876 if (!bsearch (& relid , subrel_local_oids ,
865- list_length ( subrel_states ) , sizeof (Oid ), oid_cmp ))
877+ subrel_count , sizeof (Oid ), oid_cmp ))
866878 {
867879 AddSubscriptionRelState (sub -> oid , relid ,
868880 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY ,
@@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
881893 sizeof (Oid ), oid_cmp );
882894
883895 remove_rel_len = 0 ;
884- for (off = 0 ; off < list_length ( subrel_states ) ; off ++ )
896+ for (off = 0 ; off < subrel_count ; off ++ )
885897 {
886898 Oid relid = subrel_local_oids [off ];
887899
@@ -1784,6 +1796,117 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
17841796 table_close (rel , RowExclusiveLock );
17851797}
17861798
1799+ /*
1800+ * Check and log a warning if the publisher has subscribed to the same table
1801+ * from some other publisher. This check is required only if "copy_data = true"
1802+ * and "origin = none" for CREATE SUBSCRIPTION and
1803+ * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
1804+ * having origin might have been copied.
1805+ *
1806+ * This check need not be performed on the tables that are already added
1807+ * because incremental sync for those tables will happen through WAL and the
1808+ * origin of the data can be identified from the WAL records.
1809+ *
1810+ * subrel_local_oids contains the list of relation oids that are already
1811+ * present on the subscriber.
1812+ */
1813+ static void
1814+ check_publications_origin (WalReceiverConn * wrconn , List * publications ,
1815+ bool copydata , char * origin , Oid * subrel_local_oids ,
1816+ int subrel_count , char * subname )
1817+ {
1818+ WalRcvExecResult * res ;
1819+ StringInfoData cmd ;
1820+ TupleTableSlot * slot ;
1821+ Oid tableRow [1 ] = {TEXTOID };
1822+ List * publist = NIL ;
1823+ int i ;
1824+
1825+ if (!copydata || !origin ||
1826+ (pg_strcasecmp (origin , LOGICALREP_ORIGIN_NONE ) != 0 ))
1827+ return ;
1828+
1829+ initStringInfo (& cmd );
1830+ appendStringInfoString (& cmd ,
1831+ "SELECT DISTINCT P.pubname AS pubname\n"
1832+ "FROM pg_publication P,\n"
1833+ " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
1834+ " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
1835+ " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
1836+ "WHERE C.oid = GPT.relid AND P.pubname IN (" );
1837+ get_publications_str (publications , & cmd , true);
1838+ appendStringInfoString (& cmd , ")\n" );
1839+
1840+ /*
1841+ * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
1842+ * the list of relation oids that are already present on the subscriber.
1843+ * This check should be skipped for these tables.
1844+ */
1845+ for (i = 0 ; i < subrel_count ; i ++ )
1846+ {
1847+ Oid relid = subrel_local_oids [i ];
1848+ char * schemaname = get_namespace_name (get_rel_namespace (relid ));
1849+ char * tablename = get_rel_name (relid );
1850+
1851+ appendStringInfo (& cmd , "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n" ,
1852+ schemaname , tablename );
1853+ }
1854+
1855+ res = walrcv_exec (wrconn , cmd .data , 1 , tableRow );
1856+ pfree (cmd .data );
1857+
1858+ if (res -> status != WALRCV_OK_TUPLES )
1859+ ereport (ERROR ,
1860+ (errcode (ERRCODE_CONNECTION_FAILURE ),
1861+ errmsg ("could not receive list of replicated tables from the publisher: %s" ,
1862+ res -> err )));
1863+
1864+ /* Process tables. */
1865+ slot = MakeSingleTupleTableSlot (res -> tupledesc , & TTSOpsMinimalTuple );
1866+ while (tuplestore_gettupleslot (res -> tuplestore , true, false, slot ))
1867+ {
1868+ char * pubname ;
1869+ bool isnull ;
1870+
1871+ pubname = TextDatumGetCString (slot_getattr (slot , 1 , & isnull ));
1872+ Assert (!isnull );
1873+
1874+ ExecClearTuple (slot );
1875+ publist = list_append_unique (publist , makeString (pubname ));
1876+ }
1877+
1878+ /*
1879+ * Log a warning if the publisher has subscribed to the same table from
1880+ * some other publisher. We cannot know the origin of data during the
1881+ * initial sync. Data origins can be found only from the WAL by looking at
1882+ * the origin id.
1883+ *
1884+ * XXX: For simplicity, we don't check whether the table has any data or
1885+ * not. If the table doesn't have any data then we don't need to
1886+ * distinguish between data having origin and data not having origin so we
1887+ * can avoid logging a warning in that case.
1888+ */
1889+ if (publist )
1890+ {
1891+ StringInfo pubnames = makeStringInfo ();
1892+
1893+ /* Prepare the list of publication(s) for warning message. */
1894+ get_publications_str (publist , pubnames , false);
1895+ ereport (WARNING ,
1896+ errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1897+ errmsg ("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin" ,
1898+ subname ),
1899+ errdetail_plural ("Subscribed publication %s is subscribing to other publications." ,
1900+ "Subscribed publications %s are subscribing to other publications." ,
1901+ list_length (publist ), pubnames -> data ),
1902+ errhint ("Verify that initial data copied from the publisher tables did not come from other origins." ));
1903+ }
1904+
1905+ ExecDropSingleTupleTableSlot (slot );
1906+
1907+ walrcv_clear_result (res );
1908+ }
1909+
17871910/*
17881911 * Get the list of tables which belong to specified publications on the
17891912 * publisher connection.
0 commit comments