Mahout and Scalding for poker collusion detection

When I’ve been reading a very bright book on Mahout, Mahout In Action (which is a great hands-in intro to machine learning, as well), one of the examples has caught my attention.
Authors of the book where using well-known K-means clusterization algorithm for finding similar players on, where the criterion of similarity was the set of the authors of questions/answers the users were up-/downvoting.

In a very simple words, K-means algorithm iteratively finds clusters of points/vectors, located close to each other, in a multidimensional space. Being applied to the problem of finding similars players in StackOverflow, we assume that every axis in the multi-dimensional space is a user, where the distance from zero is a sum of points, awarded to the questions/answers given by other players (those dimensions are also often called “features”, where the the distance is a “feature weight”).

Obviously, the same approach can be applied to one of the most sophisticated problems in a massively-multiplayer online poker – collusion detection. We’re making a simple assumption that if two or more players have played too much games with each other (taking into account that any of the players could simply have been an active player, who played a lot of games with anyone), they might be in a collusion.

We break a massive set of players into a small, tight clusters (preferably, with 2-8 players in each), using K-means clustering algorithm. In a basic implementation that we will go through further, every user is represented as a vector, where axises are other players that she has played with (and the weight of the feature is the number of games, played together).

Stage 1. Building a dictionary

As the first step, we need to build a dictionary/enumeration of all the players, involved in the subset of hand history that we analyze:

  // extract user ID from hand history record
  val userId = (playerHistory: PlayerHandHistory) =>
    new Text(playerHistory.getUserId.toString)

  // Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand
  // history records
  class Builder(args: Args) extends Job(args) {

    // input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf
    val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
    // output tap - plain text file with player IDs
    val output = TextLine(args("output"))

      .flatMap('blob -> 'player) {
      // every hand history record contains the list of players, participated in the hand
      blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list
      .unique('player) // remove duplicate user IDs
      .project('player) // leave only 'player column from the tuple


Stage 2. Adding indices to the dictionary

Secondly, we map user IDs to position/index of a player in the vector.

  class Indexer(args: Args) extends Job(args) {

    val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable],
      'userId -> 'idx)

      .map(('offset -> 'line) -> ('userId -> 'idx)) {
      // dictionary lines are read with indices from TextLine source
      // out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them
      tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5)))
      .project(('userId -> 'idx)) // only userId -> index tuple is passed to the output

 1003 0
 1004 1
 1005 2
 1006 3
 1007 4

Stage 3. Building vectors

We build vectors that will be passed as an input to K-means clustering algorithm. As we noted above, every position in the vector corresponds to another player the player has played with:

 * K-means clustering algorithm requires the input to be represented as vectors.
 * In out case, the vector, itself, represents the player, where other users, the player has played with, are
 * vector axises/features (the weigh of the feature is a number of games, played together)
 * User: remeniuk
class VectorBuilder(args: Args) extends Job(args) {

  import Dictionary._

  // initializes dictionary pipe
  val dictionary = TextLine(args("dictionary"))
    .map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) {
    tuple: (Int, String) =>
      (tuple._2 -> tuple._1 / 5)
    .project(('userId -> 'dictionaryIdx))

  val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
  val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable],
    'player1Id -> 'vector)

    .flatMap('blob -> ('player1Id -> 'player2Id)) {
    //builds a flat list of pairs of users that player together
    blob: Array[Byte] =>
      val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob)
      playerList.flatMap {
        playerId =>
          playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString))
    .joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of //user that played together with
    // the dictionary, so that the second member of the tuple (ID of the second //player) is replaced with th index
    //in the dictionary
    .groupBy('player1Id -> 'dictionaryIdx) {
    group => group.size // groups pairs of players, played together, counting the number of hands
    .map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) {
    tuple: (String, Int, Int) =>
      val partialVector = new NamedVector(
        new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1) 
// turns a tuple of two users
      // into a vector with one feature
      partialVector.set(tuple._2, tuple._3)
      (new Text(tuple._1), new VectorWritable(partialVector))
    .groupBy('player1Id) {
    // combines partial vectors into one vector that represents the number of hands, //played with other players
    group => group.reduce('partialVector -> 'vector) {
      (left: VectorWritable, right: VectorWritable) =>
        new VectorWritable(

 1003 {3:5.0,5:4.0,6:4.0,9:4.0}
 1004 {2:4.0,4:4.0,8:4.0,37:4.0}
 1005 {1:4.0,4:5.0,8:4.0,37:4.0}
 1006 {0:5.0,5:4.0,6:4.0,9:4.0}
 1007 {1:4.0,2:5.0,8:4.0,37:4.0}

The entire workflow, required to vectorize the input:

    val conf = new Configuration
    conf.set("io.serializations", ","
      + "")

    // the path, where the vectors will be stored to
    val vectorsPath = new Path("job/vectors")
    // enumeration of all users involved in a selected subset of hand history records
    val dictionaryPath = new Path("job/dictionary")
    // text file with the dictionary size
    val dictionarySizePath = new Path("job/dictionary-size")
    // indexed dictionary (every user ID in the dictionary is mapped to an index, from 0)
    val indexedDictionaryPath = new Path("job/indexed-dictionary")

    println("Building dictionary...")
    // extracts IDs of all the users, participating in selected subset of hand history records
    Tool.main(Array(classOf[Dictionary.Builder].getName, "--hdfs",
      "--hbasehost", "localhost", "--output", dictionaryPath.toString))
    // adds index to the dictionary
    Tool.main(Array(classOf[Dictionary.Indexer].getName, "--hdfs",
      "--input", dictionaryPath.toString, "--output", indexedDictionaryPath.toString))
    // calculates dictionary size, and stores it to the FS
    Tool.main(Array(classOf[Dictionary.Size].getName, "--hdfs",
      "--input", dictionaryPath.toString, "--output", dictionarySizePath.toString))

    // reads dictionary size
    val fs = FileSystem.get(dictionaryPath.toUri, conf)
    val dictionarySize = new BufferedReader(
      new InputStreamReader( Path(dictionarySizePath, "part-00000"))

    // builds vectors (player -> other players in the game)
    // IDs of other players (in the vectors) are replaces with indices, taken from dictionary
    Tool.main(Array(classOf[VectorBuilder].getName, "--hdfs",
      "--dictionary", dictionaryPath.toString, "--hbasehost", "localhost",
      "--output", vectorsPath.toString, "--dictionarySize", dictionarySize.toString))

Stage 4. Generating n-random clusters

Random clusters/centroids is an entry point for K-means algorithm:

//randomly selected cluster the will be passed as an input to K-means
     val inputClustersPath = new Path('jobinput-clusters')
     val distanceMeasure = new EuclideanDistanceMeasure
     println('Making random seeds...')
      //build 30 initial random clusterscentroids
     RandomSeedGenerator.buildRandom(conf, vectorsPath, inputClustersPath, 30, distanceMeasure)

Stage 5. Running K-means algorithms

Every next iteration, K-means will find better centroids and clusters. As a result, we have 30 clusters of players that played with each other the most often:

    // clusterization results
    val outputClustersPath = new Path("job/output-clusters")
    // textual dump of clusterization results
    val dumpPath = "job/dump"

    println("Running K-means...")
    // runs K-means algorithm with up to 20 iterations, to find clusters of colluding players (assumption of collusion is
    // made on the basis of number hand player together with other player[s]), vectorsPath, inputClustersPath, outputClustersPath,
      new CosineDistanceMeasure(), 0.01, 20, true, 0, false)

    println("Printing results...")

    // dumps clusters to a text file
    val clusterizationResult = finalClusterPath(conf, outputClustersPath, 20)
    val clusteredPoints = new Path(outputClustersPath, "clusteredPoints")
    val clusterDumper = new ClusterDumper(clusterizationResult, clusteredPoints)
    clusterDumper.setTermDictionary(new Path(indexedDictionaryPath, "part-00000").toString,


Let’s go to “job/dump”, now – this file contains textual dumps of all clusters, generated by K-means. Here’s a small fragment of the file:

 VL-0{n=5 c=[1003:3.400, 1006:3.400, 1008:3.200, 1009:3.200, 1012:3.200] r=[1003:1.744, 1006:1.744, 1008:1.600, 1009:1.600, 1012:1.600]}
  Top Terms: 
   1006                                    =>                 3.4
   1003                                    =>                 3.4
   1012                                    =>                 3.2
   1009                                    =>                 3.2
   1008                                    =>                 3.2
 VL-15{n=1 c=[1016:4.000, 1019:3.000, 1020:3.000, 1021:3.000, 1022:3.000, 1023:3.000, 1024:3.000, 1025:3.000] r=[]}
  Top Terms: 
   1016                                    =>                 4.0
   1025                                    =>                 3.0
   1024                                    =>                 3.0
   1023                                    =>                 3.0
   1022                                    =>                 3.0
   1021                                    =>                 3.0
   1020                                    =>                 3.0
   1019                                    =>                 3.0

As we can see, 2 clusters of players have been detected: one with 8 players, that has played a lot of games with each other, and the second with 4 players.

Reference: Poker collusion detection with Mahout and Scalding from our JCG partner Vasil Remeniuk at the Vasil Remeniuk blog blog.

Notify of

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Newest Most Voted
Inline Feedbacks
View all comments
10 years ago

Stack Overflow is a question and answer site for professional and enthusiast programmers. It’s 100% free, no registration required.

It doesn´t seem that stackoverflow is a poker gaming site…


Back to top button