Scala

# Mahout and Scalding for poker collusion detection

When I’ve been reading a very bright book on Mahout, Mahout In Action (which is a great hands-in intro to machine learning, as well), one of the examples has caught my attention.
Authors of the book where using well-known K-means clusterization algorithm for finding similar players on stackoverflow.com, where the criterion of similarity was the set of the authors of questions/answers the users were up-/downvoting.

In a very simple words, K-means algorithm iteratively finds clusters of points/vectors, located close to each other, in a multidimensional space. Being applied to the problem of finding similars players in StackOverflow, we assume that every axis in the multi-dimensional space is a user, where the distance from zero is a sum of points, awarded to the questions/answers given by other players (those dimensions are also often called “features”, where the the distance is a “feature weight”).

Obviously, the same approach can be applied to one of the most sophisticated problems in a massively-multiplayer online poker – collusion detection. We’re making a simple assumption that if two or more players have played too much games with each other (taking into account that any of the players could simply have been an active player, who played a lot of games with anyone), they might be in a collusion.

We break a massive set of players into a small, tight clusters (preferably, with 2-8 players in each), using K-means clustering algorithm. In a basic implementation that we will go through further, every user is represented as a vector, where axises are other players that she has played with (and the weight of the feature is the number of games, played together).

Stage 1. Building a dictionary

As the first step, we need to build a dictionary/enumeration of all the players, involved in the subset of hand history that we analyze:

```
// extract user ID from hand history record
val userId = (playerHistory: PlayerHandHistory) =>
new Text(playerHistory.getUserId.toString)

// Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand
// history records
class Builder(args: Args) extends Job(args) {

// input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf
val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
// output tap - plain text file with player IDs
val output = TextLine(args("output"))

input
.flatMap('blob -> 'player) {
// every hand history record contains the list of players, participated in the hand
blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list
HandHistory.parseFrom(blob).getPlayerList.map(userId)
}
.unique('player) // remove duplicate user IDs
.project('player) // leave only 'player column from the tuple
.write(output)

}
```
```
1003
1004
1005
1006
1007
...
```

Stage 2. Adding indices to the dictionary

Secondly, we map user IDs to position/index of a player in the vector.

```
class Indexer(args: Args) extends Job(args) {

val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable],
'userId -> 'idx)

.map(('offset -> 'line) -> ('userId -> 'idx)) {
// dictionary lines are read with indices from TextLine source
// out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them
tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5)))
}
.project(('userId -> 'idx)) // only userId -> index tuple is passed to the output
.write(output)

}
```
```
1003 0
1004 1
1005 2
1006 3
1007 4
...
```

Stage 3. Building vectors

We build vectors that will be passed as an input to K-means clustering algorithm. As we noted above, every position in the vector corresponds to another player the player has played with:

```
/**
* K-means clustering algorithm requires the input to be represented as vectors.
* In out case, the vector, itself, represents the player, where other users, the player has played with, are
* vector axises/features (the weigh of the feature is a number of games, played together)
* User: remeniuk
*/
class VectorBuilder(args: Args) extends Job(args) {

import Dictionary._

// initializes dictionary pipe
val dictionary = TextLine(args("dictionary"))
.map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) {
tuple: (Int, String) =>
(tuple._2 -> tuple._1 / 5)
}
.project(('userId -> 'dictionaryIdx))

val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable],
'player1Id -> 'vector)

input
.flatMap('blob -> ('player1Id -> 'player2Id)) {
//builds a flat list of pairs of users that player together
blob: Array[Byte] =>
val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId)
playerList.flatMap {
playerId =>
playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString))
}
}
.joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of //user that played together with
// the dictionary, so that the second member of the tuple (ID of the second //player) is replaced with th index
//in the dictionary
.groupBy('player1Id -> 'dictionaryIdx) {
group => group.size // groups pairs of players, played together, counting the number of hands
}
.map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) {
tuple: (String, Int, Int) =>
val partialVector = new NamedVector(
new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1)
// turns a tuple of two users
// into a vector with one feature
partialVector.set(tuple._2, tuple._3)
(new Text(tuple._1), new VectorWritable(partialVector))
}
.groupBy('player1Id) {
// combines partial vectors into one vector that represents the number of hands, //played with other players
group => group.reduce('partialVector -> 'vector) {
(left: VectorWritable, right: VectorWritable) =>
new VectorWritable(left.get.plus(right.get))
}
}
.write(output)

}
```
```
1003 {3:5.0,5:4.0,6:4.0,9:4.0}
1004 {2:4.0,4:4.0,8:4.0,37:4.0}
1005 {1:4.0,4:5.0,8:4.0,37:4.0}
1006 {0:5.0,5:4.0,6:4.0,9:4.0}
1007 {1:4.0,2:5.0,8:4.0,37:4.0}
...
```

The entire workflow, required to vectorize the input:

```
val conf = new Configuration

// the path, where the vectors will be stored to
val vectorsPath = new Path("job/vectors")
// enumeration of all users involved in a selected subset of hand history records
val dictionaryPath = new Path("job/dictionary")
// text file with the dictionary size
val dictionarySizePath = new Path("job/dictionary-size")
// indexed dictionary (every user ID in the dictionary is mapped to an index, from 0)
val indexedDictionaryPath = new Path("job/indexed-dictionary")

println("Building dictionary...")
// extracts IDs of all the users, participating in selected subset of hand history records
Tool.main(Array(classOf[Dictionary.Builder].getName, "--hdfs",
"--hbasehost", "localhost", "--output", dictionaryPath.toString))
// adds index to the dictionary
Tool.main(Array(classOf[Dictionary.Indexer].getName, "--hdfs",
"--input", dictionaryPath.toString, "--output", indexedDictionaryPath.toString))
// calculates dictionary size, and stores it to the FS
Tool.main(Array(classOf[Dictionary.Size].getName, "--hdfs",
"--input", dictionaryPath.toString, "--output", dictionarySizePath.toString))

val fs = FileSystem.get(dictionaryPath.toUri, conf)
fs.open(new Path(dictionarySizePath, "part-00000"))

println("Vectorizing...")
// builds vectors (player -> other players in the game)
// IDs of other players (in the vectors) are replaces with indices, taken from dictionary
Tool.main(Array(classOf[VectorBuilder].getName, "--hdfs",
"--dictionary", dictionaryPath.toString, "--hbasehost", "localhost",
"--output", vectorsPath.toString, "--dictionarySize", dictionarySize.toString))
```

Stage 4. Generating n-random clusters

Random clusters/centroids is an entry point for K-means algorithm:

```
//randomly selected cluster the will be passed as an input to K-means
val inputClustersPath = new Path('jobinput-clusters')
val distanceMeasure = new EuclideanDistanceMeasure

println('Making random seeds...')
//build 30 initial random clusterscentroids
RandomSeedGenerator.buildRandom(conf, vectorsPath, inputClustersPath, 30, distanceMeasure)
```

Stage 5. Running K-means algorithms

Every next iteration, K-means will find better centroids and clusters. As a result, we have 30 clusters of players that played with each other the most often:

```
// clusterization results
val outputClustersPath = new Path("job/output-clusters")
// textual dump of clusterization results
val dumpPath = "job/dump"

println("Running K-means...")
// runs K-means algorithm with up to 20 iterations, to find clusters of colluding players (assumption of collusion is
// made on the basis of number hand player together with other player[s])
KMeansDriver.run(conf, vectorsPath, inputClustersPath, outputClustersPath,
new CosineDistanceMeasure(), 0.01, 20, true, 0, false)

println("Printing results...")

// dumps clusters to a text file
val clusterizationResult = finalClusterPath(conf, outputClustersPath, 20)
val clusteredPoints = new Path(outputClustersPath, "clusteredPoints")
val clusterDumper = new ClusterDumper(clusterizationResult, clusteredPoints)
clusterDumper.setNumTopFeatures(10)
clusterDumper.setOutputFile(dumpPath)
clusterDumper.setTermDictionary(new Path(indexedDictionaryPath, "part-00000").toString,
"sequencefile")
clusterDumper.printClusters(null)
```

Results

Let’s go to “job/dump”, now – this file contains textual dumps of all clusters, generated by K-means. Here’s a small fragment of the file:

``` VL-0{n=5 c=[1003:3.400, 1006:3.400, 1008:3.200, 1009:3.200, 1012:3.200] r=[1003:1.744, 1006:1.744, 1008:1.600, 1009:1.600, 1012:1.600]}
Top Terms:
1006                                    =>                 3.4
1003                                    =>                 3.4
1012                                    =>                 3.2
1009                                    =>                 3.2
1008                                    =>                 3.2

VL-15{n=1 c=[1016:4.000, 1019:3.000, 1020:3.000, 1021:3.000, 1022:3.000, 1023:3.000, 1024:3.000, 1025:3.000] r=[]}
Top Terms:
1016                                    =>                 4.0
1025                                    =>                 3.0
1024                                    =>                 3.0
1023                                    =>                 3.0
1022                                    =>                 3.0
1021                                    =>                 3.0
1020                                    =>                 3.0
1019                                    =>                 3.0```

As we can see, 2 clusters of players have been detected: one with 8 players, that has played a lot of games with each other, and the second with 4 players.

Reference: Poker collusion detection with Mahout and Scalding from our JCG partner Vasil Remeniuk at the Vasil Remeniuk blog blog.

### Vasil Remeniuk

Subscribe
Notify of

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Inline Feedbacks