xMapReduce Examples
This page provides examples showing how to use xMapReduce for common data processing tasks.
The following examples are taken from the actual implementations in the xmapreduce/application
directory.
Word Count Implementation
// Mapper: Extracts words from text and counts them
class MyMapper : public xmapreduce::Mapper<xmapreduce::string, xmapreduce::KeyValuePair<xmapreduce::string, int>>
{
public:
void mapImpl(xmapreduce::string* str, int size, xkvstore::KVStore<xmapreduce::string, int>* kvStore) override
{
// Process each string in the input array
for (int i = 0; i < size; i++)
{
processText(str[i], kvStore);
}
}
private:
// Process text by extracting words
void processText(const xmapreduce::string& text, xkvstore::KVStore<xmapreduce::string, int>* kvStore)
{
size_t len = text.length();
size_t pos = 0;
while (pos < len)
{
size_t start;
size_t wordLen;
if (findNextWord(text, pos, len, start, wordLen))
{
xmapreduce::string word = extractWord(text, start, wordLen);
if (word.length() > 0)
{
kvStore->insert(word, 1);
}
}
else
{
break;
}
}
}
// Additional helper methods omitted for brevity
};
// Reducer: Sums up counts for each word
class MyReducer : public xmapreduce::Reducer<xkvstore::KeyValuePair<xmapreduce::string, int>, xkvstore::KeyValuePair<xmapreduce::string, int>>
{
public:
void reduceImpl(xkvstore::KVStore<xmapreduce::string, int>* inputKVStore, xkvstore::KVStore<xmapreduce::string, int>* outputKVStore) override
{
std::unordered_map<xmapreduce::string, int> aggregate;
for (const auto& kv : *inputKVStore)
{
aggregate[kv.getKey()] += kv.getValue();
}
for (const auto& [key, value] : aggregate)
{
outputKVStore->insert(key, value);
}
}
};
Example usage:
auto mapReduce = xmapreduce::MapReduce<MyMapper, MyReducer>::builder()
.numMaps(8)
.numReduces(4)
.numTasks(2)
.resultCapacity(1024)
.build();
mapReduce->execute(dataVec32, sizeof(dataVec32) / sizeof(dataVec32[0]));
auto results = mapReduce->getResultKVStore()->getAllKeyValuePairs();
Sort Implementation
// Mapper: Passes through integers with a common key
class SortMapper : public xmapreduce::Mapper<int, xmapreduce::KeyValuePair<xmapreduce::string, int>>
{
public:
void mapImpl(int* data, int elementCount, xkvstore::KVStore<xmapreduce::string, int>* kvStore) override
{
for (int i = 0; i < elementCount; ++i)
{
kvStore->insert(xmapreduce::string("key"), data[i]);
}
}
};
// Reducer: Sorts the values
class SortReducer : public xmapreduce::Reducer<xkvstore::KeyValuePair<xmapreduce::string, int>, xkvstore::KeyValuePair<xmapreduce::string, int>>
{
public:
void reduceImpl(xkvstore::KVStore<xmapreduce::string, int>* inputKVStore, xkvstore::KVStore<xmapreduce::string, int>* outputKVStore) override
{
std::vector<xkvstore::KeyValuePair<xmapreduce::string, int>> data;
for (const auto& kv : *inputKVStore)
{
data.push_back(kv);
}
std::sort(data.begin(), data.end(), [](const xkvstore::KeyValuePair<xmapreduce::string, int>& a, const xkvstore::KeyValuePair<xmapreduce::string, int>& b)
{
return a.getValue() < b.getValue();
});
for (const auto& kv : data)
{
outputKVStore->insert(kv.getKey(), kv.getValue());
}
}
};
Example usage:
std::vector<int> dataList = {16, 18, 17, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1};
auto mapReduce = xmapreduce::MapReduce<SortMapper, SortReducer>::builder()
.numMaps(16)
.numReduces(16)
.numTasks(4)
.resultCapacity(1024)
.threshold(8)
.build();
mapReduce->execute(dataList);
// Results will contain the sorted list
Group By Implementation
// Data structure for sales information
struct SalesData
{
int productId;
xmapreduce::string category;
int price;
int quantity;
};
// Mapper: Groups by category
class SalesMapper : public xmapreduce::Mapper<SalesData, xmapreduce::KeyValuePair<xmapreduce::string, int>>
{
public:
void mapImpl(SalesData* data, int size, xkvstore::KVStore<xmapreduce::string, int>* kvStore) override
{
for (int i = 0; i < size; i++)
{
// Use category as key, quantity as value
kvStore->insert(data[i].category, data[i].quantity);
}
}
};
// Reducer: Sums quantities by category
class SalesReducer : public xmapreduce::Reducer<xkvstore::KeyValuePair<xmapreduce::string, int>, xkvstore::KeyValuePair<xmapreduce::string, int>>
{
public:
void reduceImpl(xkvstore::KVStore<xmapreduce::string, int>* inputKVStore, xkvstore::KVStore<xmapreduce::string, int>* outputKVStore) override
{
std::unordered_map<xmapreduce::string, int> categorySales;
for (const auto& kv : *inputKVStore)
{
categorySales[kv.getKey()] += kv.getValue();
}
for (const auto& [category, totalSales] : categorySales)
{
outputKVStore->insert(category, totalSales);
}
}
};
Example usage:
SalesData salesData[] = {
{1001, xmapreduce::string("Electronics"), 5000, 10},
{1002, xmapreduce::string("Clothing"), 3000, 15},
{1003, xmapreduce::string("Electronics"), 8000, 5},
{1004, xmapreduce::string("Clothing"), 2000, 20},
{1005, xmapreduce::string("Food"), 1000, 30}
};
auto mapReduce = xmapreduce::MapReduce<SalesMapper, SalesReducer>::builder()
.numMaps(4)
.numReduces(2)
.resultCapacity(1024)
.build();
mapReduce->execute(salesData, sizeof(salesData) / sizeof(salesData[0]));
// Results will contain total quantities by category