comment insert des données parellel dans trois tables différentes

J'ai une procédure stockée qui va insert la majeure partie des loggings, maintenant il y a une possibilité d'insert des données dans 3 tables en parallèle;

  • Première table insérant 1 million d'loggings.
  • Deuxième table insérant 1,5 million d'loggings.
  • Troisième table insérant 500k loggings

Selon mes connaissances – l'insertion de la procédure se passe l'une après l'autre.

Alors, comment puis-je implémenter le chargement en parallèle?

Les instructions s'exécutent de manière synchrone dans un lot T-SQL. Pour exécuter plusieurs instructions de manière asynchronous et parallèle à partir d'une procédure stockée, vous devez utiliser plusieurs connections de database simultanées. Notez que la partie délicate de l'exécution asynchronous détermine non seulement si toutes les tâches sont terminées, mais aussi si elles ont réussi ou échoué.

Méthode 1: package SSIS

Créez un package SSIS pour exécuter les 3 instructions SQL en parallèle. Dans SQL 2012 et versions ultérieures, exécutez le package à l'aide des procédures stockées du catalogue SSIS. Pré-SQL 2012, vous devez créer un travail SQL Agent pour le package et le lancer avec sp_start_job.

Vous devez vérifier l'état de l'exécution de SSIS ou l'état du travail de l'agent SQL pour déterminer l'achèvement et le résultat du succès / de l'échec.

Méthode 2: Powershell et Agent SQL

Exécutez un travail SQL Agent qui exécute un script Powershell qui exécute les requêtes en parallèle à l'aide de travaux d'arrière-plan Powershell (command Start-Job). Le script peut renvoyer un code de sortie, zéro pour la réussite et différent de zéro pour l'échec, afin que l'Agent SQL puisse déterminer s'il a réussi. Vérifiez l'état du travail de l'agent SQL pour déterminer l'achèvement et le résultat du succès / de l'échec.

Méthode 3: plusieurs emplois de l'Agent SQL

Exécutez plusieurs tâches SQL Agent simultanément, chacune avec une étape de travail T-SQL contenant le script d'import. Vérifiez le statut du travail de l'agent SQL de chaque travail pour déterminer l'achèvement et le résultat du succès / de l'échec.

Méthode 4: Service Broker Utilisez un proc activé par queue pour exécuter les scripts d'import en parallèle. Cela peut être obtus si vous n'avez jamais utilisé Service Broker avant et il est important de suivre les templates approuvés. J'ai inclus un exemple pour vous aider à démarrer (replace THROW par RAISERROR pour pre-SQL 2012). La database doit avoir Service Broker activé, activé par défaut mais désactivé après une restauration ou une connection.

USE YourDatabase; Go --create proc that will be automatically executed (activated) when requests are waiting CREATE PROC dbo.ExecuteTSqlTask AS SET NOCOUNT ON; DECLARE @TSqlJobConversationHandle uniqueidentifier = NEWID() , @TSqlExecutionRequestMessage xml , @TSqlExecutionResultMessage xml , @TSqlExecutionResult varchar(10) , @TSqlExecutionResultDetails nvarchar(MAX) , @TSqlScript nvarchar(MAX) , @TSqlTaskName sysname , @RowsAffected int , @message_type_name sysname; WHILE 1 = 1 BEGIN --get the next task to execute WAITFOR ( RECEIVE TOP (1) @TSqlJobConversationHandle = conversation_handle , @TSqlExecutionRequestMessage = CAST(message_body AS xml) , @message_type_name = message_type_name FROM dbo.TSqlExecutionQueue ), TIMEOUT 1000; IF @@ROWCOUNT = 0 BEGIN --no work to do - exit BREAK; END; IF @message_type_name = N'TSqlExecutionRequest' BEGIN --get task name and script SELECT @TSqlTaskName = @TSqlExecutionRequestMessage.value('(/TSqlTaskName)[1]', 'sysname') , @TSqlScript = @TSqlExecutionRequestMessage.value('(/TSqlScript)[1]', 'nvarchar(MAX)'); --execute script BEGIN TRY EXEC sp_executesql @TSqlScript; SET @RowsAffected = @@ROWCOUNT; SET @TSqlExecutionResult = 'Completed'; SET @TSqlExecutionResultDetails = CAST(@RowsAffected as varchar(10)) + ' rows affected'; END TRY BEGIN CATCH SET @TSqlExecutionResult = 'Erred'; SET @TSqlExecutionResultDetails = 'Msg ' + CAST(ERROR_NUMBER() AS varchar(10)) + ', Level ' + CAST(ERROR_SEVERITY() AS varchar(2)) + ', State ' + CAST(ERROR_STATE() AS varchar(10)) + ', Line ' + CAST(ERROR_LINE() AS varchar(10)) + ': ' + ERROR_MESSAGE(); END CATCH; --send execution result back to initiator SET @TSqlExecutionResultMessage = '<TSqlTaskName /><TSqlExecutionResult /><TSqlExecutionResultDetails />'; SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlTaskName")} into (/TSqlTaskName)[1] '); SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResult")} into (/TSqlExecutionResult)[1] '); SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResultDetails")} into (/TSqlExecutionResultDetails)[1] '); SEND ON CONVERSATION @TSqlJobConversationHandle MESSAGE TYPE TSqlExecutionResult (@TSqlExecutionResultMessage); END ELSE BEGIN IF @message_type_name = N'TSqlJobComplete' BEGIN --service has ended conversation so we're not going to get any more execution requests END CONVERSATION @TSqlJobConversationHandle; END ELSE BEGIN END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteTSqlTask'; RAISERROR('Unexpected message type received (%s) by ExecuteTSqlTask', 16, 1, @message_type_name); END; END; END; GO CREATE QUEUE dbo.TSqlResultQueue; CREATE QUEUE dbo.TSqlExecutionQueue WITH STATUS=ON, ACTIVATION ( STATUS = ON , PROCEDURE_NAME = dbo.ExecuteTSqlTask , MAX_QUEUE_READERS = 3 --max number of concurrent activated proc instances , EXECUTE AS OWNER ); CREATE MESSAGE TYPE TSqlExecutionRequest VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE TSqlExecutionResult VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE TSqlJobComplete VALIDATION = WELL_FORMED_XML; CREATE CONTRACT TSqlExecutionContract ( TSqlExecutionRequest SENT BY INITIATOR , TSqlJobComplete SENT BY INITIATOR , TSqlExecutionResult SENT BY TARGET ); CREATE SERVICE TSqlJobService ON QUEUE dbo.TSqlResultQueue ([TSqlExecutionContract]); CREATE SERVICE TSqlExecutorService ON QUEUE dbo.TSqlExecutionQueue ([TSqlExecutionContract]); GO CREATE PROC dbo.ExecuteParallelImportScripts AS SET NOCOUNT ON; DECLARE @TSqlJobConversationHandle uniqueidentifier , @TSqlExecutionRequestMessage xml , @TSqlExecutionResultMessage xml , @TSqlExecutionResult varchar(10) , @TSqlExecutionResultDetails nvarchar(MAX) , @TSqlTaskName sysname , @CompletedCount int = 0 , @ErredCount int = 0 , @message_type_name sysname; DECLARE @TsqlTask TABLE( TSqlTaskName sysname NOT NULL PRIMARY KEY , TSqlScript nvarchar(MAX) NOT NULL ); BEGIN TRY --insert a row for each import task INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) VALUES(N'ImportScript1', N'INSERT INTO dbo.Table1 SELECT * FROM dbo.Table1Staging;'); INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) VALUES(N'ImportScript2', N'INSERT INTO dbo.Table2 SELECT * FROM dbo.Table2Staging;'); INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) VALUES(N'ImportScript3', N'INSERT INTO dbo.Table3 SELECT * FROM dbo.Table3Staging;'); --start a conversation for this import process BEGIN DIALOG CONVERSATION @TsqlJobConversationHandle FROM SERVICE TSqlJobService TO SERVICE 'TSqlExecutorService', 'CURRENT DATABASE' ON CONTRACT TSqlExecutionContract WITH ENCRYPTION = OFF; --send import tasks to executor service for parallel execution DECLARE JobTasks CURSOR LOCAL FAST_FORWARD FOR SELECT (SELECT TSqlTaskName, TSqlScript FROM @TsqlTask AS task WHERE task.TSqlTaskName = job.TSqlTaskName FOR XML PATH(''), TYPE) AS TSqlExecutionRequest FROM @TsqlTask AS job; OPEN JobTasks; WHILE 1 = 1 BEGIN FETCH NEXT FROM JobTasks INTO @TSqlExecutionRequestMessage; IF @@FETCH_STATUS = -1 BREAK; SEND ON CONVERSATION @TSqlJobConversationHandle MESSAGE TYPE TSqlExecutionRequest (@TSqlExecutionRequestMessage); END; CLOSE JobTasks; DEALLOCATE JobTasks; --get each parallel task execution result until all are complete WHILE 1 = 1 BEGIN --get next task result WAITFOR ( RECEIVE TOP (1) @TSqlExecutionResultMessage = CAST(message_body AS xml) , @message_type_name = message_type_name FROM dbo.TSqlResultQueue WHERE conversation_handle = @TSqlJobConversationHandle ), TIMEOUT 1000; IF @@ROWCOUNT <> 0 BEGIN IF @message_type_name = N'TSqlExecutionResult' BEGIN --get result of import script execution SELECT @TSqlTaskName = @TSqlExecutionResultMessage.value('(/TSqlTaskName)[1]', 'sysname') , @TSqlExecutionResult = @TSqlExecutionResultMessage.value('(/TSqlExecutionResult)[1]', 'varchar(10)') , @TSqlExecutionResultDetails = COALESCE(@TSqlExecutionResultMessage.value('(/TSqlExecutionResultDetails)[1]', 'nvarchar(MAX)'), N''); RAISERROR('Import task %s %s: %s', 0, 0, @TSqlTaskName, @TSqlExecutionResult, @TSqlExecutionResultDetails) WITH NOWAIT; IF @TSqlExecutionResult = 'Completed' BEGIN SET @CompletedCount += 1; END ELSE BEGIN SET @ErredCount += 1; END; --remove task from tracking table after completion DELETE FROM @TSqlTask WHERE TSqlTaskName = @TSqlTaskName; IF NOT EXISTS(SELECT 1 FROM @TsqlTask) BEGIN --all tasks are done - send TSqlJobComplete message to instruct executor service to end conversation SEND ON CONVERSATION @TSqlJobConversationHandle MESSAGE TYPE TSqlJobComplete; END END ELSE BEGIN IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN --executor service has ended conversation so we're done END CONVERSATION @TSqlJobConversationHandle; BREAK; END ELSE BEGIN END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteParallelInserts'; RAISERROR('Unexpected message type received (%s) by ExecuteParallelInserts', 16, 1, @message_type_name); END; END END; END; RAISERROR('Import processing completed. CompletedCount=%d, ErredCount=%d.', 0, 0, @CompletedCount, @ErredCount); END TRY BEGIN CATCH THROW; END CATCH; GO --execute import scripts in parallel EXEC dbo.ExecuteParallelImportScripts; GO 

Vous pouvez essayer de créer trois tâches et exécuter des scripts d'insertion en parallèle comme ci-dessous:

 DECLARE @jobId BINARY(16) EXEC msdb.dbo.sp_add_job @job_name=N'Job1', @enabled=1, @description=N'No description available.', @job_id = @jobId OUTPUT EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Insert into First Table', @step_id=1, @cmdexec_success_code=0, @on_success_action=1, @on_success_step_id=0, @on_fail_action=2, @on_fail_step_id=0, @retry_attempts=0, @retry_interval=0, @os_run_priority=0, @subsystem=N'TSQL', @command=N'--Insert script for first table', @database_name=N'Test', @flags=0 EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)' GO DECLARE @jobId BINARY(16) EXEC msdb.dbo.sp_add_job @job_name=N'Job2', @enabled=1, @description=N'No description available.', @job_id = @jobId OUTPUT EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Insert into second Table', @step_id=1, @cmdexec_success_code=0, @on_success_action=1, @on_success_step_id=0, @on_fail_action=2, @on_fail_step_id=0, @retry_attempts=0, @retry_interval=0, @os_run_priority=0, @subsystem=N'TSQL', @command=N'--Insert script for second table', @database_name=N'Test', @flags=0 EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)' GO DECLARE @jobId BINARY(16) EXEC msdb.dbo.sp_add_job @job_name=N'Job3', @enabled=1, @description=N'No description available.', @job_id = @jobId OUTPUT EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Insert into Third Table', @step_id=1, @cmdexec_success_code=0, @on_success_action=1, @on_success_step_id=0, @on_fail_action=2, @on_fail_step_id=0, @retry_attempts=0, @retry_interval=0, @os_run_priority=0, @subsystem=N'TSQL', @command=N'--Insert script for third table', @database_name=N'Test', @flags=0 EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)' GO EXEC msdb.dbo.sp_start_job N'Job1' ; --All will execute in parallel EXEC msdb.dbo.sp_start_job N'Job2' ; EXEC msdb.dbo.sp_start_job N'Job3' ; 

En supposant que vous souhaitiez avoir la même valeur de date d'insertion pour toutes les insertions, définissez un paramètre de date défini sur la date actuelle comme indiqué.

 DECLARE @InsertDate as date SET @InsertDate = GetDate() 

Ensuite, passez le paramètre date d'insertion dans votre procédure stockée d'insertion et mettez à jour cette procédure stockée en conséquence pour utiliser cette input. Cela garantira que la même valeur de date d'insertion sera utilisée pour tous les inserts.

 EXEC dbo.InsertTables123 @p1 = @InsertDate 

Le paramètre d'input @InsertDate peut également être affecté manuellement si autre chose que la date actuelle si nécessaire.

Pour votre procédure, je suppose que vous avez tableName et l'location du file en tant que parameters.

Si vous avez un gros file qui a 3 millions d'loggings, vous devez split le file en 3 petits files (Si vous connaissez une autre langue que sql), après cela, vous pouvez ouvrir 3 consoles de servers Sql en command line, et appelez la procédure dans chaque console. Cela rendra l'insertion parallèle. Ou vous connaissez d'autres langages de programmation, vous pouvez utiliser plusieurs threads pour appeler la procédure.

Les trois arrays sont-ils identiques en termes de structure et de contenu? Si oui, utilisez la réplication transactionnelle / de fusion

Vous pouvez également créer un triggersur sur la première table à insert dans la deuxième et la troisième table