Mahout and Scalding for poker collusion detection

When I’ve been reading a very bright book on Mahout, Mahout In Action (which is a great hands-in intro to machine learning, as well), one of the examples has caught my attention.
Authors of the book where using well-known K-means clusterization algorithm for finding similar players on stackoverflow.com, where the criterion of similarity was the set of the authors of questions/answers the users were up-/downvoting.

In a very simple words, K-means algorithm iteratively finds clusters of points/vectors, located close to each other, in a multidimensional space. Being applied to the problem of finding similars players in StackOverflow, we assume that every axis in the multi-dimensional space is a user, where the distance from zero is a sum of points, awarded to the questions/answers given by other players (those dimensions are also often called “features”, where the the distance is a “feature weight”).

Obviously, the same approach can be applied to one of the most sophisticated problems in a massively-multiplayer online poker – collusion detection. We’re making a simple assumption that if two or more players have played too much games with each other (taking into account that any of the players could simply have been an active player, who played a lot of games with anyone), they might be in a collusion.

We break a massive set of players into a small, tight clusters (preferably, with 2-8 players in each), using K-means clustering algorithm. In a basic implementation that we will go through further, every user is represented as a vector, where axises are other players that she has played with (and the weight of the feature is the number of games, played together).

Stage 1. Building a dictionary

As the first step, we need to build a dictionary/enumeration of all the players, involved in the subset of hand history that we analyze:

    
  // extract user ID from hand history record
  val userId = (playerHistory: PlayerHandHistory) =>
    new Text(playerHistory.getUserId.toString)

  // Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand
  // history records
  class Builder(args: Args) extends Job(args) {

    // input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf
    val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
    // output tap - plain text file with player IDs
    val output = TextLine(args("output"))

    input
      .read
      .flatMap('blob -> 'player) {
      // every hand history record contains the list of players, participated in the hand
      blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list
        HandHistory.parseFrom(blob).getPlayerList.map(userId)
    }
      .unique('player) // remove duplicate user IDs
      .project('player) // leave only 'player column from the tuple
      .write(output)

  }
   
 1003
 1004
 1005
 1006
 1007
 ...

Stage 2. Adding indices to the dictionary

Secondly, we map user IDs to position/index of a player in the vector.

   
  class Indexer(args: Args) extends Job(args) {

    val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable],
      'userId -> 'idx)

    TextLine(args("input")).read
      .map(('offset -> 'line) -> ('userId -> 'idx)) {
      // dictionary lines are read with indices from TextLine source
      // out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them
      tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5)))
    }
      .project(('userId -> 'idx)) // only userId -> index tuple is passed to the output
      .write(output)

  }
 
 1003 0
 1004 1
 1005 2
 1006 3
 1007 4
 ...
 

Stage 3. Building vectors

We build vectors that will be passed as an input to K-means clustering algorithm. As we noted above, every position in the vector corresponds to another player the player has played with:

 
/**
 * K-means clustering algorithm requires the input to be represented as vectors.
 * In out case, the vector, itself, represents the player, where other users, the player has played with, are
 * vector axises/features (the weigh of the feature is a number of games, played together)
 * User: remeniuk
 */
class VectorBuilder(args: Args) extends Job(args) {

  import Dictionary._

  // initializes dictionary pipe
  val dictionary = TextLine(args("dictionary"))
    .read
    .map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) {
    tuple: (Int, String) =>
      (tuple._2 -> tuple._1 / 5)
  }
    .project(('userId -> 'dictionaryIdx))

  val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
  val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable],
    'player1Id -> 'vector)

  input
    .read
    .flatMap('blob -> ('player1Id -> 'player2Id)) {
    //builds a flat list of pairs of users that player together
    blob: Array[Byte] =>
      val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId)
      playerList.flatMap {
        playerId =>
          playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString))
      }
  }
    .joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of //user that played together with
    // the dictionary, so that the second member of the tuple (ID of the second //player) is replaced with th index
    //in the dictionary
    .groupBy('player1Id -> 'dictionaryIdx) {
    group => group.size // groups pairs of players, played together, counting the number of hands
  }
    .map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) {
    tuple: (String, Int, Int) =>
      val partialVector = new NamedVector(
        new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1) 
// turns a tuple of two users
      // into a vector with one feature
      partialVector.set(tuple._2, tuple._3)
      (new Text(tuple._1), new VectorWritable(partialVector))
  }
    .groupBy('player1Id) {
    // combines partial vectors into one vector that represents the number of hands, //played with other players
    group => group.reduce('partialVector -> 'vector) {
      (left: VectorWritable, right: VectorWritable) =>
        new VectorWritable(left.get.plus(right.get))
    }
  }
    .write(output)

}
 
 1003 {3:5.0,5:4.0,6:4.0,9:4.0}
 1004 {2:4.0,4:4.0,8:4.0,37:4.0}
 1005 {1:4.0,4:5.0,8:4.0,37:4.0}
 1006 {0:5.0,5:4.0,6:4.0,9:4.0}
 1007 {1:4.0,2:5.0,8:4.0,37:4.0}
 ...

The entire workflow, required to vectorize the input:

     
    val conf = new Configuration
    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
      + "org.apache.hadoop.io.serializer.WritableSerialization")

    // the path, where the vectors will be stored to
    val vectorsPath = new Path("job/vectors")
    // enumeration of all users involved in a selected subset of hand history records
    val dictionaryPath = new Path("job/dictionary")
    // text file with the dictionary size
    val dictionarySizePath = new Path("job/dictionary-size")
    // indexed dictionary (every user ID in the dictionary is mapped to an index, from 0)
    val indexedDictionaryPath = new Path("job/indexed-dictionary")

    println("Building dictionary...")
    // extracts IDs of all the users, participating in selected subset of hand history records
    Tool.main(Array(classOf[Dictionary.Builder].getName, "--hdfs",
      "--hbasehost", "localhost", "--output", dictionaryPath.toString))
    // adds index to the dictionary
    Tool.main(Array(classOf[Dictionary.Indexer].getName, "--hdfs",
      "--input", dictionaryPath.toString, "--output", indexedDictionaryPath.toString))
    // calculates dictionary size, and stores it to the FS
    Tool.main(Array(classOf[Dictionary.Size].getName, "--hdfs",
      "--input", dictionaryPath.toString, "--output", dictionarySizePath.toString))

    // reads dictionary size
    val fs = FileSystem.get(dictionaryPath.toUri, conf)
    val dictionarySize = new BufferedReader(
      new InputStreamReader(
        fs.open(new Path(dictionarySizePath, "part-00000"))
      )).readLine().toInt

    println("Vectorizing...")
    // builds vectors (player -> other players in the game)
    // IDs of other players (in the vectors) are replaces with indices, taken from dictionary
    Tool.main(Array(classOf[VectorBuilder].getName, "--hdfs",
      "--dictionary", dictionaryPath.toString, "--hbasehost", "localhost",
      "--output", vectorsPath.toString, "--dictionarySize", dictionarySize.toString))

Stage 4. Generating n-random clusters

Random clusters/centroids is an entry point for K-means algorithm:

      
//randomly selected cluster the will be passed as an input to K-means
     val inputClustersPath = new Path('jobinput-clusters')
     val distanceMeasure = new EuclideanDistanceMeasure
 
     println('Making random seeds...')
      //build 30 initial random clusterscentroids
     RandomSeedGenerator.buildRandom(conf, vectorsPath, inputClustersPath, 30, distanceMeasure)

Stage 5. Running K-means algorithms

Every next iteration, K-means will find better centroids and clusters. As a result, we have 30 clusters of players that played with each other the most often:

      
    // clusterization results
    val outputClustersPath = new Path("job/output-clusters")
    // textual dump of clusterization results
    val dumpPath = "job/dump"

    println("Running K-means...")
    // runs K-means algorithm with up to 20 iterations, to find clusters of colluding players (assumption of collusion is
    // made on the basis of number hand player together with other player[s])
    KMeansDriver.run(conf, vectorsPath, inputClustersPath, outputClustersPath,
      new CosineDistanceMeasure(), 0.01, 20, true, 0, false)

    println("Printing results...")

    // dumps clusters to a text file
    val clusterizationResult = finalClusterPath(conf, outputClustersPath, 20)
    val clusteredPoints = new Path(outputClustersPath, "clusteredPoints")
    val clusterDumper = new ClusterDumper(clusterizationResult, clusteredPoints)
    clusterDumper.setNumTopFeatures(10)
    clusterDumper.setOutputFile(dumpPath)
    clusterDumper.setTermDictionary(new Path(indexedDictionaryPath, "part-00000").toString,
      "sequencefile")
    clusterDumper.printClusters(null)

Results

Let’s go to “job/dump”, now – this file contains textual dumps of all clusters, generated by K-means. Here’s a small fragment of the file:

 VL-0{n=5 c=[1003:3.400, 1006:3.400, 1008:3.200, 1009:3.200, 1012:3.200] r=[1003:1.744, 1006:1.744, 1008:1.600, 1009:1.600, 1012:1.600]}
  Top Terms: 
   1006                                    =>                 3.4
   1003                                    =>                 3.4
   1012                                    =>                 3.2
   1009                                    =>                 3.2
   1008                                    =>                 3.2
 
 VL-15{n=1 c=[1016:4.000, 1019:3.000, 1020:3.000, 1021:3.000, 1022:3.000, 1023:3.000, 1024:3.000, 1025:3.000] r=[]}
  Top Terms: 
   1016                                    =>                 4.0
   1025                                    =>                 3.0
   1024                                    =>                 3.0
   1023                                    =>                 3.0
   1022                                    =>                 3.0
   1021                                    =>                 3.0
   1020                                    =>                 3.0
   1019                                    =>                 3.0

As we can see, 2 clusters of players have been detected: one with 8 players, that has played a lot of games with each other, and the second with 4 players.

Reference: Poker collusion detection with Mahout and Scalding from our JCG partner Vasil Remeniuk at the Vasil Remeniuk blog blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

One Response to "Mahout and Scalding for poker collusion detection"

  1. Opvistar says:

    Stack Overflow is a question and answer site for professional and enthusiast programmers. It’s 100% free, no registration required.

    It doesn´t seem that stackoverflow is a poker gaming site…

    Op

Leave a Reply


1 + = six



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close