Sitecore Page Recommender: Collating, merging data and training our model (via the cortex processing engine)

page recommender

This blog post is part of a larger series that looks at the steps to create a
Page Recommendation Engine for Sitecore.


Introduction

After configuring the pages to measure engagement (as described here Measuring user engagement on a page). You should start to see these engagement events showing in XConnect (for info on how to find these events in the xDB databases see this article xxx).

An illustration of how the raw event data might be stored is as below:

  1. Raw Event Data
UserPageGoalEngagement Value
BillDesignClicked Tab 12
BobDesignClicked Tab 21
BillArtClicked Tab 34
BillDesignClicked Tab 12
BillDesignClicked Tab 34
BobArtClicked Tab 21
BobArtClicked Tab 12

In order to start using this data with our recommendation service though, we need to project the data into a format similar to below:

2. Projected Event Data

UserPageTotal Engagement
BillArt4
BillDesign9
BobArt3
BobDesign1

Collating the data with Cortex Processing Engine

In order to collate the data, we need to register worker tasks with the processing engine. To do this we can use a scheduled task or cron job. The code to register the tasks is as below:

Projection

First of all we need to get an instance of the task manager (from DI). This is what we will use to register the tasks in the (ProcessingEngineTasks DB).

We then define the query that will be used to retrieve our raw data from XConnect (i.e. table 1 above) and pass it in to our datasourceOptions (with defaults set). The expand options indicates we need details about contacts in addition to the interactions.

var taskManager = ServiceLocator.ServiceProvider.GetService<ITaskManager>();
var xConnectClient = SitecoreXConnectClientConfiguration.GetClient();
var taskTimeout = TimeSpan.FromHours(12);
var storageTimeout = TimeSpan.FromHours(12);
var query = xConnectClient.Interactions.Where(interaction =>
        interaction.Events.OfType<Goal>().Any()
        && interaction.Events.Any(ev => _eventsList.Contains(ev.DefinitionId))    // list of goals we are interested in
        && interaction.StartDateTime > DateTime.Now.AddHours(-4380)   // last 6 months
);
var expandOptions = new InteractionExpandOptions
{
    Contact = new RelatedContactExpandOptions()
};
query = query.WithExpandOptions(expandOptions);
var searchRequest = query.GetSearchRequest();

var dataSourceOptions = new InteractionSearchDataSourceOptionsDictionary(
    searchRequest, // searchRequest
    30, // maxBatchSize
    50 // defaultSplitItemCount
);

Next we define projection worker options. Here we pass in our IModel (which determines the projection itself), the schema name (a way to group tables) and also options which will be used to determine the name of the table where the first set of projections are stored (i.e. there could be multiple contactPages tables created by parallel projection workers – because it runs as a distributed task).

 var projectionOptions = new InteractionProjectionWorkerOptionsDictionary(
    typeof(PageRecommendationModel).AssemblyQualifiedName, // modelTypeString
    storageTimeout, // timeToLive
    "train-recommendation", // schemaName
    new Dictionary<string, string> // modelOptions
    {
    { 
PageRecommendationModel.OptionTableName, "contactPages" }
    }
);

The model mentioned above has the below definition. Here we define the projection, where we select ItemId, EngagementValue and ContactId from the result of our initial data query.

We also specify that we want to use two keys ItemId and ContactId (i.e. group by these unique combination of values) and for each combination measure the total Engagement.

public IProjection<Interaction> Projection =>
    Sitecore.Processing.Engine.Projection.Projection.Of<Interaction>()
        .CreateTabular(_tableName,
            interaction => interaction.Events.OfType<Goal>().Select(e => new { ItemId = e.ItemId, EngagementValue = e.EngagementValue, ContactId = interaction.Contact.Id }),
            cfg => cfg
                .Key("ItemId", x => x.ItemId)
                .Key("ContactId", x => x.ContactId)
                .Measure("Engagement", x => x.EngagementValue)
        );

Finally we wrap all the above up in a call to the task manager, passing each of the defined elements from above.

var projectionTaskId = await taskManager.RegisterDistributedTaskAsync(
    dataSourceOptions, // datasourceOptions
    projectionOptions, // workerOptions
    null, // prerequisiteTaskIds
    taskTimeout // expiresAfter
);

Merging

The above projection tasks will result in multiple ‘contactPages’ tables. This is because the task is distributed so could be picked up by multiple workers. Once we have finished projecting, we then need to chain a further task to merge the results.

 var mergeOptions = new MergeWorkerOptionsDictionary(
    "contactPagesFinal", // tableName
    "contactPages", // prefix
    storageTimeout, // timeToLive
    "train-recommendation" // schemaName
);

var mergeTaskId = await taskManager.RegisterDeferredTaskAsync(
    mergeOptions, // workerOptions
    new[] // prerequisiteTaskIds
    {
    projectionTaskId
    },
    taskTimeout // expiresAfter
);

In the above code, you can see that the merge task will look for tables called “contactPages“ (in the train-recommendation schema), then merge the data and store the result in “contactPagesFinal“ table. You can also see that we register a prerequisite task, which means the merge will not start until the projection task is complete.

With the merge task complete, we should now have our XConnect event data collated into a table called contactPagesFinal, in a format similar to Table #2 (Merged Event Data). You can see this stored in the ProcessingEngineStorage database.

Training

With our collated data in place, we now need to send it to out Machine Learning service to train our model. We will do this, by chaining another task to read the data from the contactPagesFinal table and send it to the service.

To do this we create a custom model PageTrainingDeferredWorkerthat implements IDeferredWorker. We pass this model into a DeferredWorkerOptionsDictionary, together with the necessary information to find our collated data (i.e. schema name and table). The training task is called in a similar manner to before, assigning the merge task as a prerequisite.

var trainingOptions = new DeferredWorkerOptionsDictionary(
    typeof(PageTrainingDeferredWorker).AssemblyQualifiedName, // workerType
    new Dictionary<string, string> // options
    {
    { PageTrainingDeferredWorker.OptionSourceTableName, "contactPagesFinal" },
    { PageTrainingDeferredWorker.OptionTargetTableName, string.Empty },
    { PageTrainingDeferredWorker.OptionSchemaName, "train-recommendation" },
    { PageTrainingDeferredWorker.OptionLimit, "5" }
    }
);

var trainingTaskId = await taskManager.RegisterDeferredTaskAsync(
    trainingOptions, // workerOptions
    new[] // prerequisiteTaskIds
    {
    mergeTaskId
    },
    taskTimeout // expiresAfter
);

The bulk of the work is done in the RunAsync method on our custom class. Here we can see, it reads all the data from the contactPagesFinal table and adds it into a list of PageEngagement objects. This is then sent to our machine learning service to train our model (see Creating an ML.net model with more functionality for use as our service for more information)

 public async Task RunAsync(CancellationToken token)
{
    var sourceRows = await _tableStore.GetRowsAsync(_sourceTableName, CancellationToken.None);
    var data = new List<PageEngagement>();
    while (await sourceRows.MoveNext())
    {
        foreach (var row in sourceRows.Current)
        {
            var pageEngagement = new PageEngagement() { ContactId = row["ContactId"].ToString(), PageId = row["ItemId"].ToString(), Engagement = float.Parse(row["Engagement"].ToString()) };
            data.Add(pageEngagement);
        }
    }
    _machinelearning.Train(data);
}

Summary

In this article, we have seen how how to pull data (for the last 6 months) from XConnect, project only the fields that we are interested in, group it by two unique key combinations and count the total engagement. Then merge the resulting tables into one final data set and sent the information to our machine learning service to train the model.

Next up

With our model trained, all that remains to do is to select a group of contacts, who have engaged with the pages we are interested in, in a given period of time. Then for each of those contacts, call the machine learning service to see which page it predicts each contact would be most interested in. Then store those results as custom facets for each contact.

Next up in the series: Generating page recommendations and storing the results

Leave a Reply

Your email address will not be published. Required fields are marked *