Parallel tasks writing to database

Topics: Core
May 29, 2014 at 9:16 PM
Edited May 29, 2014 at 9:19 PM
Hello. I've created a background task. In Sweep method I call several tasks in parallel.
The tasks fetches data from a web service and stores it into the database using IRespository.
On my development machine I use IIS Express & SQL CE for Orchard database.
Unfortunately when the task is one there is no problem, but when the tasks are several I got the following error:

014-05-29 22:12:47,954 [71] NHibernate.Event.Default.AbstractFlushingEventListener - (null) - Could not synchronize database state with session
(null)
NHibernate.Exceptions.GenericADOException: could not update: [Smartsys.OpcXmlDa.AddressSpaceCache.Models.BrowseElementRecord#3352][SQL: UPDATE Smartsys_OpcXmlDa_AddressSpaceCache_BrowseElementRecord SET GatewayKey = ?, ItemPath = ?, ItemName = ?, Name = ?, IsItem = ?, HasChildren = ?, ParentId = ?, ParentItemPath = ?, ParentItemName = ?, ParentName = ?, LastUpdatedUTC = ? WHERE Id = ?] ---> System.Data.SqlServerCe.SqlCeLockTimeoutException: SQL Server Compact timed out waiting for a lock. The default lock time is 2000ms for devices and 5000ms for desktops. The default lock timeout can be increased in the connection string using the ssce: default lock timeout property. [ Session id = 22,Thread id = 8900,Process id = 17940,Table name = Smartsys_OpcXmlDa_AddressSpaceCache_BrowseElementRecord,Conflict type = u lock (x blocks),Resource = RID: 4087:1 ]
at System.Data.SqlServerCe.SqlCeCommand.ProcessResults(Int32 hr)
at System.Data.SqlServerCe.SqlCeCommand.ExecuteCommandText(IntPtr& pCursor, Boolean& isBaseTableCursor)
at System.Data.SqlServerCe.SqlCeCommand.ExecuteCommand(CommandBehavior behavior, String method, ResultSetOptions options)
at System.Data.SqlServerCe.SqlCeCommand.ExecuteNonQuery()
at NHibernate.AdoNet.AbstractBatcher.ExecuteNonQuery(IDbCommand cmd) in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\AdoNet\AbstractBatcher.cs:line 203
at NHibernate.AdoNet.NonBatchingBatcher.AddToBatch(IExpectation expectation) in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\AdoNet\NonBatchingBatcher.cs:line 40
at NHibernate.Persister.Entity.AbstractEntityPersister.Update(Object id, Object[] fields, Object[] oldFields, Object rowId, Boolean[] includeProperty, Int32 j, Object oldVersion, Object obj, SqlCommandInfo sql, ISessionImplementor session) in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\Persister\Entity\AbstractEntityPersister.cs:line 2799
--- End of inner exception stack trace ---
at NHibernate.Persister.Entity.AbstractEntityPersister.Update(Object id, Object[] fields, Object[] oldFields, Object rowId, Boolean[] includeProperty, Int32 j, Object oldVersion, Object obj, SqlCommandInfo sql, ISessionImplementor session) in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\Persister\Entity\AbstractEntityPersister.cs:line 2811
at NHibernate.Persister.Entity.AbstractEntityPersister.UpdateOrInsert(Object id, Object[] fields, Object[] oldFields, Object rowId, Boolean[] includeProperty, Int32 j, Object oldVersion, Object obj, SqlCommandInfo sql, ISessionImplementor session) in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\Persister\Entity\AbstractEntityPersister.cs:line 2702
at NHibernate.Persister.Entity.AbstractEntityPersister.Update(Object id, Object[] fields, Int32[] dirtyFields, Boolean hasDirtyCollection, Object[] oldFields, Object oldVersion, Object obj, Object rowId, ISessionImplementor session) in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\Persister\Entity\AbstractEntityPersister.cs:line 3010
at NHibernate.Action.EntityUpdateAction.Execute() in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\Action\EntityUpdateAction.cs:line 79
at NHibernate.Engine.ActionQueue.Execute(IExecutable executable) in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\Engine\ActionQueue.cs:line 136
at NHibernate.Engine.ActionQueue.ExecuteActions(IList list) in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\Engine\ActionQueue.cs:line 126
at NHibernate.Engine.ActionQueue.ExecuteActions() in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\Engine\ActionQueue.cs:line 170
at NHibernate.Event.Default.AbstractFlushingEventListener.PerformExecutions(IEventSource session) in c:\Users\sebros\Documents\My Projects\nhibernate-core\src\NHibernate\Event\Default\AbstractFlushingEventListener.cs:line 241

What is the best practice to manipulate data when using parallel tasks.
May 29, 2014 at 9:18 PM
Edited May 29, 2014 at 9:21 PM
I appreciate any kind of help or suggestions. Thank you.
Here is the code snippet:

    public class BackgroundTaskHandler : IBackgroundTask
    {
        private readonly ILifetimeScope _lifetimeScope;

        private enum TaskStatus
        {
            None,
            Pending,
            Processing,
            Done,
            Failed
        }

        private class JobContext
        {
            public Task Job { get; set; }
            public TimeSpan ElapsedEtime { get; set; }
            public TaskStatus Status { get; set; }
            public Exception Exception { get; set; }
            public DateTime LastTime { get; set; }
            public int TaskId { get; set; }
            public string GatewayKey { get; set; }
            public string ItemPath { get; set; }
            public string ItemName { get; set; }
        }

        public ILogger Logger { get; set; }

        public BackgroundTaskHandler(ILifetimeScope lifetimeScope)
        {
            this._lifetimeScope = lifetimeScope;
            Logger = NullLogger.Instance;
        }

        private JobContext Process(JobContext jobContext)
        {
            jobContext.Job = Task.Factory.StartNew(() =>
            {
                using (var scope = _lifetimeScope.BeginLifetimeScope())
                {
                    var scheduledTaskRecordService = scope.Resolve<IScheduledTaskRecordService>();
                    scheduledTaskRecordService.SetStatus(jobContext.TaskId, TaskStatus.Processing.ToString());
                }

                using (var scope = _lifetimeScope.BeginLifetimeScope())
                {
                    var transactionManager = scope.Resolve<ITransactionManager>();
                    var gatewayConfigurationRecordService = scope.Resolve<IGatewayConfigurationRecordService>();
                    jobContext.LastTime = DateTime.Now;
                    Stopwatch chrono = new Stopwatch();
                    chrono.Start();
                    try
                    {
                        //transactionManager.RequireNew();
                        jobContext.Status = TaskStatus.Processing;

                        var gatewayRecord = gatewayConfigurationRecordService.GetRecordByGatewayKey(jobContext.GatewayKey);
                        var client = new OpcXmlDaServiceClient(gatewayRecord);

                        Browse(client, jobContext.GatewayKey, jobContext.ItemPath, jobContext.ItemName);

                        jobContext.Status = TaskStatus.Done;
                    }
                    catch (Exception e)
                    {
                        jobContext.Status = TaskStatus.Failed;
                        jobContext.Exception = e;
                        this.Logger.Error(e, e.Message);
                    }
                    finally
                    {
                        chrono.Stop();
                        jobContext.ElapsedEtime = chrono.Elapsed;
                    }
                }
            });

            return jobContext;
        }

        private void Browse(OpcXmlDaServiceClient client, string gatewayKey, string itemPath, string itemName, BrowseElementRecord parentRecord = null)
        {
            string continuationPoint = null;
            BrowseElement[] browseElements = null;
            OPCError[] opcErrors = null;
            bool moreElements = false;
            var rb = client.Browse(null, null, null, itemPath, itemName, ref continuationPoint, 0, browseFilter.all, null, null, false, false, false, out browseElements, out opcErrors, out moreElements);

            if (browseElements != null && browseElements.Length > 0)
            {
                var branches = new List<Tuple<BrowseElement, BrowseElementRecord>>();

                using (var scope = _lifetimeScope.BeginLifetimeScope())
                {
                    var browseElementRecordService = scope.Resolve<IBrowseElementRecordService>();
                    var transcationManager = scope.Resolve<ITransactionManager>();
                    transcationManager.RequireNew();
                    foreach (var element in browseElements)
                    {
                        var record = browseElementRecordService.GetRecord(gatewayKey, element.ItemPath, element.ItemName);
                        if (record == null)
                            record = browseElementRecordService.CreateRecord();

                        record.GatewayKey = gatewayKey;
                        record.ItemPath = element.ItemPath;
                        record.ItemName = element.ItemName;
                        record.Name = element.Name;
                        record.IsItem = element.IsItem;
                        record.HasChildren = element.HasChildren;
                        record.ParentItemPath = parentRecord != null ? parentRecord.ItemPath : null;
                        record.ParentItemName = parentRecord != null ? parentRecord.ItemName : null;
                        record.ParentName = parentRecord != null ? parentRecord.Name : null;
                        record.ParentId = parentRecord != null ? parentRecord.Id : 0;
                        record.LastUpdatedUTC = DateTime.UtcNow;
                        browseElementRecordService.UpdateRecord(record);
                        if (!element.IsItem)
                            branches.Add(new Tuple<BrowseElement, BrowseElementRecord>(element, parentRecord));
                    }
                }
               
                foreach (var branch in branches)
                {
                    try
                    {
                        Browse(client, gatewayKey, branch.Item1.ItemPath, branch.Item1.ItemName, branch.Item2);
                    }
                    catch (Exception e)
                    {
                        Logger.Error(e, "ItemPath={0}, ItemName={1}, GatewayKey={2}", branch.Item1.ItemPath ?? "", branch.Item1.ItemName ?? "", gatewayKey ?? "");
                    }
                }
            }
        }

        public void Sweep()
        {
            var jobs = new List<JobContext>();
            try
            {

                using (var scope = _lifetimeScope.BeginLifetimeScope())
                {
                    var scheduledTaskRecordService = scope.Resolve<IScheduledTaskRecordService>();
                    var transcationManager = scope.Resolve<ITransactionManager>();
                    transcationManager.RequireNew();
                    var tasks = scheduledTaskRecordService.Fetch(x => x.Enabled == true && x.NextTime.HasValue && x.Status == null);

                    foreach (var task in tasks)
                    {
                        task.Status = TaskStatus.Pending.ToString();
                        jobs.Add(Process(new JobContext()
                        {
                            TaskId = task.Id,
                            GatewayKey = task.GatewayKey,
                            ItemPath = task.ItemPath,
                            ItemName = task.ItemName
                        }));
                        scheduledTaskRecordService.UpdateRecord(task);
                    }
                }

                using (var scope = _lifetimeScope.BeginLifetimeScope())
                {
                    var scheduledTaskRecordService = scope.Resolve<IScheduledTaskRecordService>();
                    var transcationManager = scope.Resolve<ITransactionManager>();
                    transcationManager.RequireNew();
                    foreach (var jobContext in jobs)
                    {
                        jobContext.Job.Wait(-1);
                        var task = scheduledTaskRecordService.GetRecord(jobContext.TaskId);
                        task.NextTime = null;
                        task.LastTime = jobContext.LastTime;
                        task.Status = jobContext.Status.ToString() + " " + jobContext.ElapsedEtime.ToString();
                        scheduledTaskRecordService.UpdateRecord(task);
                    }
                }
            }
            catch (Exception e)
            {
                Logger.Error(e, e.Message);
            }
        }
    }
}