xMapReduce Examples

This page provides examples showing how to use xMapReduce for common data processing tasks.

The following examples are taken from the actual implementations in the xmapreduce/application directory.

Word Count Implementation

// Mapper: Extracts words from text and counts them
class MyMapper : public xmapreduce::Mapper<xmapreduce::string, xmapreduce::KeyValuePair<xmapreduce::string, int>>
{
public:
    void mapImpl(xmapreduce::string* str, int size, xkvstore::KVStore<xmapreduce::string, int>* kvStore) override
    {
        // Process each string in the input array
        for (int i = 0; i < size; i++)
        {
            processText(str[i], kvStore);
        }
    }

private:
    // Process text by extracting words
    void processText(const xmapreduce::string& text, xkvstore::KVStore<xmapreduce::string, int>* kvStore)
    {
        size_t len = text.length();
        size_t pos = 0;

        while (pos < len)
        {
            size_t start;
            size_t wordLen;

            if (findNextWord(text, pos, len, start, wordLen))
            {
                xmapreduce::string word = extractWord(text, start, wordLen);
                if (word.length() > 0)
                {
                    kvStore->insert(word, 1);
                }
            }
            else
            {
                break;
            }
        }
    }
    // Additional helper methods omitted for brevity
};

// Reducer: Sums up counts for each word
class MyReducer : public xmapreduce::Reducer<xkvstore::KeyValuePair<xmapreduce::string, int>, xkvstore::KeyValuePair<xmapreduce::string, int>>
{
public:
    void reduceImpl(xkvstore::KVStore<xmapreduce::string, int>* inputKVStore, xkvstore::KVStore<xmapreduce::string, int>* outputKVStore) override
    {
        std::unordered_map<xmapreduce::string, int> aggregate;
        for (const auto& kv : *inputKVStore)
        {
            aggregate[kv.getKey()] += kv.getValue();
        }
        for (const auto& [key, value] : aggregate)
        {
            outputKVStore->insert(key, value);
        }
    }
};

Example usage:

auto mapReduce = xmapreduce::MapReduce<MyMapper, MyReducer>::builder()
                     .numMaps(8)
                     .numReduces(4)
                     .numTasks(2)
                     .resultCapacity(1024)
                     .build();

mapReduce->execute(dataVec32, sizeof(dataVec32) / sizeof(dataVec32[0]));
auto results = mapReduce->getResultKVStore()->getAllKeyValuePairs();

Sort Implementation

// Mapper: Passes through integers with a common key
class SortMapper : public xmapreduce::Mapper<int, xmapreduce::KeyValuePair<xmapreduce::string, int>>
{
public:
    void mapImpl(int* data, int elementCount, xkvstore::KVStore<xmapreduce::string, int>* kvStore) override
    {
        for (int i = 0; i < elementCount; ++i)
        {
            kvStore->insert(xmapreduce::string("key"), data[i]);
        }
    }
};

// Reducer: Sorts the values
class SortReducer : public xmapreduce::Reducer<xkvstore::KeyValuePair<xmapreduce::string, int>, xkvstore::KeyValuePair<xmapreduce::string, int>>
{
public:
    void reduceImpl(xkvstore::KVStore<xmapreduce::string, int>* inputKVStore, xkvstore::KVStore<xmapreduce::string, int>* outputKVStore) override
    {
        std::vector<xkvstore::KeyValuePair<xmapreduce::string, int>> data;
        for (const auto& kv : *inputKVStore)
        {
            data.push_back(kv);
        }

        std::sort(data.begin(), data.end(), [](const xkvstore::KeyValuePair<xmapreduce::string, int>& a, const xkvstore::KeyValuePair<xmapreduce::string, int>& b)
                  {
                      return a.getValue() < b.getValue();
                  });

        for (const auto& kv : data)
        {
            outputKVStore->insert(kv.getKey(), kv.getValue());
        }
    }
};

Example usage:

std::vector<int> dataList = {16, 18, 17, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1};

auto mapReduce = xmapreduce::MapReduce<SortMapper, SortReducer>::builder()
                     .numMaps(16)
                     .numReduces(16)
                     .numTasks(4)
                     .resultCapacity(1024)
                     .threshold(8)
                     .build();
mapReduce->execute(dataList);
// Results will contain the sorted list

Group By Implementation

// Data structure for sales information
struct SalesData
{
    int productId;
    xmapreduce::string category;
    int price;
    int quantity;
};

// Mapper: Groups by category
class SalesMapper : public xmapreduce::Mapper<SalesData, xmapreduce::KeyValuePair<xmapreduce::string, int>>
{
public:
    void mapImpl(SalesData* data, int size, xkvstore::KVStore<xmapreduce::string, int>* kvStore) override
    {
        for (int i = 0; i < size; i++)
        {
            // Use category as key, quantity as value
            kvStore->insert(data[i].category, data[i].quantity);
        }
    }
};

// Reducer: Sums quantities by category
class SalesReducer : public xmapreduce::Reducer<xkvstore::KeyValuePair<xmapreduce::string, int>, xkvstore::KeyValuePair<xmapreduce::string, int>>
{
public:
    void reduceImpl(xkvstore::KVStore<xmapreduce::string, int>* inputKVStore, xkvstore::KVStore<xmapreduce::string, int>* outputKVStore) override
    {
        std::unordered_map<xmapreduce::string, int> categorySales;

        for (const auto& kv : *inputKVStore)
        {
            categorySales[kv.getKey()] += kv.getValue();
        }

        for (const auto& [category, totalSales] : categorySales)
        {
            outputKVStore->insert(category, totalSales);
        }
    }
};

Example usage:

SalesData salesData[] = {
    {1001, xmapreduce::string("Electronics"), 5000, 10},
    {1002, xmapreduce::string("Clothing"), 3000, 15},
    {1003, xmapreduce::string("Electronics"), 8000, 5},
    {1004, xmapreduce::string("Clothing"), 2000, 20},
    {1005, xmapreduce::string("Food"), 1000, 30}
};

auto mapReduce = xmapreduce::MapReduce<SalesMapper, SalesReducer>::builder()
                     .numMaps(4)
                     .numReduces(2)
                     .resultCapacity(1024)
                     .build();

mapReduce->execute(salesData, sizeof(salesData) / sizeof(salesData[0]));
// Results will contain total quantities by category