Deep reinforcement learning for checkers -- pretraining a policy

This post discusses an approach to approximate a checkers playing policy using a neural network trained on a database of human play. It is the first post in a series covering the engine behind checkers.ai, which uses a combination of supervised learning, reinforcement learning, and game tree search to play checkers. This set of approaches is based on AlphaGo (see this paper). Unlike Go, checkers has a known optimal policy that was found using exhaustive tree search and end-game databases (see this paper). Although Chinook has already solved checkers, the game is still very complex with over 5 x 10^20 states, making it interesting application of deep reinforcement learning. To give you an idea, it took Jonathan Schaeffer and his team at UC Alberta 18 years (1989-2007) to complete to search the game tree end-to-end. Here I will discuss how we can use a database of expert human moves to pretrain a policy, which will be the building block of the engine. I’ll show that a pretrained policy can beat intermediate/advanced human players as well as some of the popular online checkers engines. In later posts we’ll take this policy and improve it through self play.

Problem statement

In reinforcement learning, the goal is to map states, or best estimates of states, to actions that achieve some set of objectives, optimally or near optimally. States represent an environment, and actions are taken in that environment so as to maximize some reward function. States can be continous or discrete. Continous state spaces are encountered in robotics settings, whereas for computer board games we use discrete state spaces to represent the environment. In both cases the policy is defined as the mapping from states to actions, and is often represented by the conditional probability distribution \( p(action | state) \). Unlike real-world reinforcement learning problems, checkers is a game of perfect information, and so we don’t need to infer or estimate the state. This greatly reduces the complexity of the problem, but unfortunately the state space is still much too large to store on any reasonably sized/priced memory or disk hardware. The goal is therefore to use a parametric model \( p(action | state, \theta) \), where \( \theta \) represents the model’s trainable parameters, to learn good strategy. In order to initialize such a policy with good checkers playing behavior, we can train it on human expert moves. This is a supervised learning problem, where we use examples in a database drawn from the joint probability distribution \( p(expert-human-action, board) \) to train our neural network policy \( p(action | state, \theta) \).

Training database

The first task is to identify a database of moves on which to train the policy. One option would be to mine data from an online checkers websites. While this could be done, it would require a lot of data preprocessing. Although online game websites collect a lot of data, one issue is that the data is high variance. We instead prefer to train our engine only on expert play. Luckily, Martin Fierz has an extensive database of ~22k expert-level games which can be found here. This databse consists of professional checkers tournament play that took place in 19th century Great Britain. The database was meticuously transcribed (by hand) with very few errors. We’re going to use this dataset. The database is a text file consisting of a cascade of games encoded as follows:

[Event "Manchester 1841"]
[Date "1841-??-??"]
[Black "Moorhead, W."]
[White "Wyllie, J."]
[Site "Manchester"]
[Result "0-1"]
1. 11-15 24-20 2. 8-11 28-24 3. 9-13 22-18 4. 15x22 25x18 5. 4-8 26-22 6. 10-14
18x9 7. 5x14 22-18 8. 1-5 18x9 9. 5x14 29-25 10. 11-15 24-19 11. 15x24 25-22 12.
24-28 22-18 13. 6-9 27-24 14. 8-11 24-19 15. 7-10 20-16 16. 11x20 18-15 17. 2-6
15-11 18. 12-16 19x12 19. 10-15 11-8 20. 15-18 21-17 21. 13x22 30-26 22. 18x27
26x17x10x1 0-1

The parser.py script in my github repository walks through this file and saves each of its 440k+ board-move pairs into a dict. The dict is then serialized for later use.

Model architecture

To encode the board state, I use the map (-3: opp king, -1: opp chkr, 0: empty, 1: own chkr, 3: own king) to represent pieces on the board. Other entries might work just as well; as a general rule you should make the entries quasi-equidistant, and stay away from machine precision. The input layer of the neural network, \( x \), can consist of either an 8x4 representation (for a CNN) or 32x1 (for a feedforward NN) depending on our choice of network. In order to capture both highly localized features (i.e., an available jump) and high level patterns, like multiple jumps, proximity to kings row, and piece differential, I feed the input into three separate layers: a convolutional layer with a 2x2 kernel, and convolutional layer with a 3x3 kernel, and a fully dense feed forward layer. These networks are first trained independently on the dataset, and then their last hidden layers are concatenated and fed into two dense layers that in turn map to a softmax output. The full network is trained end to end on the same database following concatenation of the individual nets. The figure below illustrates the network architecture.

The output layer (\( h(x) \)) consists of a location and direction represented by a 32x4 matrix with binary entries, which we then unravel into a 128x1 one-hot encoded output. You’ll note that this will produces a total of 30 invalid moves at edge and corner positions; our model will need to learn this. The State class definition is given in state.py. The board object stores board states for both players during the game by switching the input positions for the ‘black’ pieces (by rotating the board 180 deg). If you run parser.py (after downloading the raw text file, OCA_2.0.pdn) it will save a .pickle file into the local directory which you will need for the training step.

The convolutional neural network that I chose for this task is based on the inception architecture, with multiple convolutional layers, each with multiple features maps of varying size. This allows the network to learn from the local and global features that are relevant to the decision. It accepts an 8x4 input (\( x \)), and returns a 128 dimensional output (\( h(x) \)).

The algebraic form of the convolution operation, which maps layers \( l=0 \) and \( l=1 \) to layers \( l=1 \) to \( l=2 \), respectively, is given by:

\begin{equation} a_{mij}^{l} = \left( \textbf{w}_{m}^{l} * \textbf{z}_{m}^{* \ l-1} + \textbf{b}_{m}^{l} \right)_{ij} \leftarrow b_{mij}^{l} + \sum_{n}\sum_{q} z_{m, \ i-n, \ j-q}^{* \ l-1}w_{mnq}^{l} \end{equation}

where \( a_{mij}^{l} \) represents the element \( \left(i,j\right) \) in the \( m^{th} \) feature map in layer \( l \), \( \textbf{w}_{m}^{l} \) is the kernel (of size 2x2) corresponding to the \( m^{th} \) feature map in layer \( l \), \( \textbf{b}_{m}^{l} \) is the bias terms corresponding in the \( m^{th} \) feature map in layer \( l \), and \( \textbf{z}_{m}^{l-1} \) is equal to \( \textbf{x} \) for \( l=0 \), and to \( max(0, a_{mij}^{l-1}) \) (i.e., the rectified linear unit) for \( l=1 \). The operator * denotes the convolution between the kernel and its input, while \( \textbf{z}^{*} \) denotes a matrix representation of \( \textbf{z} \) that has been padded with a box of zeros around its perimeter. Padding is a technique that is often used to preserve information at the corners and edges of the input space. Here we implement something known as ‘same’ padding along with a kernel-stride of (1,1).

The next operation in the network vectorizes \( \textbf{z}_{m}^{\ l=2} \), unravelling it by row, column, and then feature map (i.e., C-order). Two fully dense mappings are then applied in successsion to yield the output at layer \( l=4 \). The algebraic form of this mapping function is given by:

\begin{equation} \textbf{z}^{l} = \sigma^{l} \left( \textbf{w}^{l} \textbf{z}^{\ l-1} + \textbf{b}^{\ l} \right) \end{equation}

where \( \sigma^{l=3} \) is a rectified linear unit, and \( \sigma^{l=4} \) is the softmax function given by:

\begin{equation} \sigma^{l=4}(z^{l=4})_{i} = \frac{e^{z_{i}^{l=4}}}{\sum_{j=0}^{n} e^{z_{j}^{l=4}}} \end{equation}

The softmax function is a normalized exponential of the outputs from layer 4, and thus can be interpreted as a probability distribution over the output space. The model prediction is given by:

\begin{equation} \mathsf{predicted \ move} = argmax\left( \sigma^{l=4}(\textbf{z}^{l=4}) \right) \end{equation}

where \( n = 128 \), the number of output labels.

Training the model

In order to train efficiently, it is important choose an appropriate loss function (\( L \)). Here I use categorical cross-entropy with l2 regularization, which is computed as follows:

\begin{equation} L(h(x)) = -\frac{1}{n}\sum_{x} \left( yln(h(x)) + (1-y)ln(1-h(x)) \right) + \lambda\left( \sum_{l=1}^{2} \sum_{m=0}^{M} \lVert \textbf{w}_{m}^{l} \rVert + \sum_{l=3}^{4} \lVert \textbf{w}^{l} \rVert \right) \end{equation}

where \( y \) denotes the ground truth label, and \( \lambda \) is the regularization constant. Here is the python code that implements this model:

class Model:

    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
    _MODEL_DEF_TEMPLATE = {'batch_size': 1, 'dropout': False, 'init': None}

    def __init__(self, name, load_dir, trainable, device):
        self.name = name
        self.trainable = trainable
        self.load_dir = load_dir
        self.write_dir = os.path.join(POLICY_PATH, 'tmp/')
        self._vars = []
        self._ops = []
        self.scopes = []
        self._device = device

    def _build_graph(self):
        pass

    def _constructor(self):
        with tf.variable_scope(self.name, reuse=tf.AUTO_REUSE):
            try:
                with tf.device(self._device):
                    self._build_graph()
            except AttributeError:
                self._build_graph()

    def init(self, session, load_dir=None):
        load_dir = load_dir or self.load_dir
        for i, layer_scope in enumerate(self.scopes):
            vars_to_load = [v for v in tf.global_variables() if
                            v.name.startswith(layer_scope)]
            self._vars.extend(vars_to_load)
            session.run(tf.variables_initializer(var_list=vars_to_load))
            if load_dir and vars_to_load:
                saver = tf.train.Saver(vars_to_load)
                saver.restore(session, os.path.join(load_dir, self.name))
                # import pprint
                # pp = pprint.PrettyPrinter(indent=3)
                # info('Successfully loaded from {}:'.format(load_dir))
                # pp.pprint(vars_to_load)

    def save_params(self, session, step=None):
        assert self.write_dir
        info('Saving {0} to {1}'.format(self.name, self.write_dir))
        saver = tf.train.Saver(list(set(self._vars)))
        saver.save(session, os.path.join(self.write_dir, self.name), global_step=step)

    def save_graph(self, session:tf.Session, fname:str, var_names:list):
        frozen_graph = convert_variables_to_constants(session, session.graph_def, var_names)
        tf.train.write_graph(frozen_graph, self.write_dir, fname + '.pb', as_text=False)
        tf.train.write_graph(frozen_graph, self.write_dir, fname + '.txt', as_text=True)


class Policy(Model):

    def __init__(self,
                 session:tf.Session,
                 name=None,
                 load_dir=None,
                 trainable=False,
                 selection='greedy',
                 device='GPU'):
        super().__init__(name="POLICY_{}".format(name),
                         load_dir=load_dir,
                         trainable=trainable,
                         device=device)
        self.session = session
        self.state = tf.placeholder(dtype=tf.int32,
                                    shape=(None, 8, 4),
                                    name="state")
        self.action_label = tf.placeholder(dtype=tf.int32,
                                           shape=(None, 128),
                                           name="action")
        self.selection = selection
        self._constructor()

    def _build_graph(self):

        #################### Graph inputs ####################
        self.batch_size = tf.placeholder(shape=(), dtype=tf.float32, name="batch_size")
        self.keep_prob = tf.placeholder(shape=(),
                                        dtype=tf.float32,
                                        name="keep_prob") if self.trainable \
                    else tf.constant(value=1,
                                     dtype=tf.float32,
                                     name="keep_prob")
        self.lr = tf.placeholder(shape=(), dtype=tf.float32, name="learning_rate")
        self.adv = tf.placeholder(shape=(None), dtype=tf.float32, name="advantage")

        ##################### Data layer #####################
        X = tf.expand_dims(tf.cast(self.state, tf.float32), axis=3)

        ###################### Inception #####################
        with tf.variable_scope("INCEPTION", reuse=False) as scope:
            self.scopes.append(scope.name)
            for i, (ksizes, nkernels) in enumerate(zip(KERNEL_SIZES, N_KERNELS)):
                conv = []
                for ks, nk in zip(ksizes, nkernels):
                    w = tf.get_variable(shape=[ks[0], ks[1], X.shape[-1], nk],
                                        initializer=PARAM_INIT,
                                        trainable=self.trainable,
                                        name='incep_{0}_w_K{1}{2}'.format(i + 1, ks[0], ks[1]))
                    b = tf.get_variable(shape=[nk],
                                        initializer=PARAM_INIT,
                                        trainable=self.trainable,
                                        name='incep_{0}_b_K{1}{2}'.format(i + 1, ks[0], ks[1]))
                    c = tf.nn.conv2d(X, w, strides=[1, 1, 1, 1], padding='SAME')
                    z = INCEP_ACT(c + b)
                    conv.append(z)
                X = tf.concat(conv, axis=3)
            X = tf.nn.dropout(X, keep_prob=self.keep_prob)

        ####################### Flatten ######################
        conv_out = tf.contrib.layers.flatten(inputs=X)
        X = conv_out
        hwy_size = X.shape[-1]

        ####################### Highway ######################
        with tf.variable_scope("HIGHWAY", reuse=False) as scope:
            self.scopes.append(scope.name)
            for i in range(HWY_LAYERS):
                with tf.variable_scope('HWY_{}'.format(i)):
                    wh = tf.get_variable(shape=[hwy_size, hwy_size],
                                         initializer=PARAM_INIT,
                                         trainable=self.trainable,
                                         dtype=tf.float32,
                                         name="hwy_w_{}".format(i + 1))
                    bh = tf.get_variable(shape=[hwy_size],
                                         initializer=PARAM_INIT,
                                         trainable=self.trainable,
                                         dtype=tf.float32,
                                         name="hwy_b_{}".format(i + 1))
                    wt = tf.get_variable(shape=[hwy_size, hwy_size],
                                         initializer=PARAM_INIT,
                                         trainable=self.trainable,
                                         dtype=tf.float32,
                                         name="T_w_{}".format(i + 1))
                    T = tf.sigmoid(tf.matmul(X, wt) + HWY_BIAS)
                    H = tf.nn.relu(tf.matmul(X, wh) + bh)
                    X = T * H + (1.0 - T) * X
            X = tf.nn.dropout(X, keep_prob=self.keep_prob)
            X = tf.concat([X, conv_out], axis=1)

        ####################### Output #######################
        with tf.variable_scope("OUTPUT", reuse=False) as scope:
            self.scopes.append(scope.name)
            w = tf.get_variable(shape=[X.shape[-1], 128],
                                initializer=PARAM_INIT,
                                trainable=self.trainable,
                                dtype=tf.float32,
                                name="w_logit")
            b = tf.get_variable(shape=[128],
                                initializer=PARAM_INIT,
                                trainable=self.trainable,
                                dtype=tf.float32,
                                name="b_logit")
            self.logits = tf.add(tf.matmul(X, w), b, name="policy_logits")
            self.softmax = tf.nn.softmax(logits=self.logits, axis=1, name="policy_softmax")
            self.action = tf.argmax(input=self.softmax, axis=1, name="action")
            self.probs, self.actions = tf.nn.top_k(input=self.softmax, k=128, sorted=True)

        ####################### Metrics ######################
        with tf.variable_scope("METRICS", reuse=False) as scope:
            self.scopes.append(scope.name)
            self.top_1_acc = tf.metrics.accuracy(labels=tf.argmax(self.action_label, axis=1),
                                                 predictions=self.action,
                                                 name="accuracy")
            self.top_2_acc = tf.reduce_mean(
                tf.cast(tf.nn.in_top_k(self.softmax, tf.argmax(self.action_label, axis=1), 2), tf.float32))
            self.top_3_acc = tf.reduce_mean(
                tf.cast(tf.nn.in_top_k(self.softmax, tf.argmax(self.action_label, axis=1), 3), tf.float32))
            self.top_5_acc = tf.reduce_mean(
                tf.cast(tf.nn.in_top_k(self.softmax, tf.argmax(self.action_label, axis=1), 5), tf.float32))
            self.top_10_acc = tf.reduce_mean(
                tf.cast(tf.nn.in_top_k(self.softmax, tf.argmax(self.action_label, axis=1), 10), tf.float32))

        ###################### Optimizer #####################
        if self.trainable:
            with tf.variable_scope("LOSS", reuse=False) as scope:
                self.scopes.append(scope.name)
                self.step = tf.Variable(0, trainable=False)
                self.reg_loss = LAMBDA * tf.add_n(
                    [tf.nn.l2_loss(v) for v in tf.global_variables() if v.name.__contains__("w_")])
                self.cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(
                        labels=self.action_label,
                        logits=self.logits,
                        name="cross_entropy")
                self.loss1 = tf.add(self.reg_loss, self.cross_entropy, name="loss1")
                self.optimizer1 = tf.train.AdamOptimizer(learning_rate=self.lr,
                                                         name="optimizer_pretrain")
                self.grad_update1 = self.optimizer1.minimize(
                    loss=self.loss1,
                    var_list=tf.trainable_variables(),
                    global_step=self.step,
                    name="grad_update")
                self.gradlogprob_adv = self.adv * tf.log(self.softmax)
                self.pg_loss = tf.reduce_mean(input_tensor=-self.gradlogprob_adv,
                                              axis=1,
                                              name="pg_loss")
                self.optimizer2 = tf.train.RMSPropOptimizer(learning_rate=self.lr,
                                                            decay=0.99,
                                                            epsilon=1e-5)
                self.policy_update = self.optimizer2.apply_gradients(
                    grads_and_vars=[self.pg_loss, self.vars],
                    global_step=self.step)

    @property
    def vars(self):
        return [v for v in tf.trainable_variables() if
                v.name.lower().__contains__(self.name.lower())]

Training Results This code passes 150,000 mini-batches of 128 samples each to the CNN for training. The model performance is summarized below. The inception model seems to capture collective strategy from the data pretty well.

The Engine

The program shown below facilitates gameplay between a human and the predictive model. The engine makes calls to the model and to the human player (via a shell-based GUI) and then uses the aforementioned Board class to keep track of board states. When the computer makes a move, the engine displays the top-5 predictions from the model. Because the model has no explicit instruction of ‘how’ to play Checkers (the rules can be found here), the engine also ensures that each move is valid (by querying the ranked predictions until it finds a valid one). I have observed invalid predictions on a few occasions. At every computer move, the engine also checks for hops (and subsequent hops). I’ve found that the model outputs hops corrently in the vast majority of cases. The code for this engine is given below.

class State:

    """
             VIEW_1
           (key: -1)
          00  01  02  03
        04  05  06  07
          08  09  10  11
        12  13  14  15
          16  17  18  19
        20  21  22  23
          24  25  26  27
        28  29  30  31
             VIEW_0
            (key: 1)
         dir3   dir0
             pos
         dir2   dir1
    """
    shell_pieces = {
        EMPTY: '-   ',
        P2_CHKR: 'x   ',
        P2_KING: 'X   ',
        P1_CHKR: 'o   ',
        P1_KING: 'O   '
    }
    dir_transform = {
        0: 2,
        1: 3,
        2: 0,
        3: 1
    }

    def __init__(self):
        self.init()


    def init(self):
        self.state = BOARD_INIT.copy()
        self.valid_jumps = {1: [], -1: []}

    def print(self, verbose=True):
        if not verbose:
            return
        board = self.state.copy()
        board_str = BOARD_VIEW[0]
        for i, x in enumerate(board):
            if not i % 4:
                board_str += BOARD_VIEW[i // 4 + 1]
            if not i % 8:
                board_str += "  "
            board_str += self.shell_pieces[x]
            if not (i + 1) % 4:
                board_str += "\n"
        board_str += BOARD_VIEW[-1]
        print(board_str)

    def get_state_view(self, view=1):
        """
        :param view: 1 or -1
        :return: board state from perspective 'view'
        """
        if view == -1:
            return -self.state.copy()[::-1]
        else:
            return self.state.copy()

    @staticmethod
    def transform_position(pos):
        return 31 - pos

    def transform_action(self, action):
        a = np.zeros((128,))
        a[action] = 1
        ind = np.argwhere(a.reshape(32, 4) == 1)[0]
        action = np.zeros((32, 4))
        action[self.transform_position(ind[0]),
               self.dir_transform[ind[1]]] = 1
        return np.argwhere(action.reshape(-1) == 1)[0]

    def transform_direction(self, direction):
        return self.dir_transform[direction]

    def get_pieces_and_directions(self, player):
        chkr_dir = P1_CHKR_DIR if player == 1 else P2_CHKR_DIR
        chkr_piece = P1_CHKR if player == 1 else P2_CHKR
        king_piece = P1_KING if player == 1 else P2_KING
        kings_row = P1_KINGS_ROW if player == 1 else P2_KINGS_ROW
        return chkr_piece, king_piece, chkr_dir, KING_DIR, kings_row

    def __get_jumps(self):
        """
        :returns updated available_jumps member
        """
        for player in (1, -1):
            state = self.get_state_view()
            valid_jumps = []
            chkr, king, chkr_dir, king_dir, _ = self.get_pieces_and_directions(player)
            for position in VALID_POS:
                piece = state[position]
                if not piece in (chkr, king):
                    continue
                for direction in king_dir if piece == king else chkr_dir:
                    neighbor = NEIGHBORS[position][direction]
                    next_neighbor = NEXT_NEIGHBORS[position][direction]
                    if neighbor == IV or next_neighbor == IV:
                        continue
                    elif state[next_neighbor] == EMPTY and \
                         state[neighbor] in (-chkr, -king):
                        valid_jumps.append((position, next_neighbor))
            self.valid_jumps[player] = valid_jumps

    @staticmethod
    def positions2action(pos_init, pos_final):
        """
        :param pos_init: initial position (w.r.t VIEW_0)
        :param pos_final: final position (w.r.t VIEW_0)
        :return: action (32x4 -> posxdir)
        """
        action = np.zeros([32, 4])
        if pos_final in NEXT_NEIGHBORS[pos_init]:
            direction = NEXT_NEIGHBORS.get(pos_init).index(pos_final)
        elif pos_final in NEIGHBORS[pos_init]:
            direction = NEIGHBORS.get(pos_init).index(pos_final)
        else:
            assert False, "{0} not reachable from {1}".format(pos_final, pos_init)
        action[pos_init, direction] = 1
        return action

    def action2positions(self, action, player):
        """
        :param action: (32x4 -> posxdir) (w.r.t. VIEW_0)
        :param player: player (1 or -1)
        :return: pos_init, pos_final, move_type
        """
        pos, direction = np.argwhere(action.reshape(32, 4) == 1)[0]
        neighbor = NEIGHBORS[pos][direction]
        next_neighbor = NEXT_NEIGHBORS[pos][direction]
        if (pos, next_neighbor) in self.valid_jumps[player]:
            return pos, next_neighbor, 'jump'
        else:
            return pos, neighbor, 'standard'

    def __set_state(self, state):
        self.state = state
        self.__get_jumps()

    def update(self, pos_init, pos_final, player, move_type, set_state=True, verbose=False):
        """
        :param pos_init: initial position (w.r.t. VIEW_0)
        :param pos_final: final_position (w.r.t. VIEW_0)
        :param player: player (1 or -1)
        :param move_type: standard or jump
        :param set_state: update internal state
        :param verbose:
        :return:
        """
        validations = []
        state = self.get_state_view()
        chkr, king, chkr_dir, king_dir, kings_row = self.get_pieces_and_directions(player)
        piece = self.state[pos_init]
        valid_dir = chkr_dir if piece == chkr else king_dir
        validations.append(piece in (chkr, king) and state[pos_final] == EMPTY)
        if move_type == 'standard':
            validations.append(pos_final in [NEIGHBORS[pos_init][i] for i in valid_dir])
        elif move_type == 'jump':
            eliminated = int(JUMPS[pos_init, pos_final])
            validations.append(state[eliminated] in (-chkr, -king))
        if all(validations):
            state[pos_final] = state[pos_init]
            state[pos_init] = EMPTY
            if pos_final in kings_row and piece == chkr:
                state[pos_final] = king
            if move_type == 'jump':
                state[eliminated] = EMPTY
                if verbose:
                    print('Position eliminated: %d' % eliminated)
            if set_state:
                self.__set_state(state)
        else:
            raise NotImplementedError


class Game:

    """
         PLAYER2
      00  01  02  03
    04  05  06  07
      08  09  10  11
    12  13  14  15
      16  17  18  19
    20  21  22  23
      24  25  26  27
    28  29  30  31
        PLAYER1
    """

    def __init__(self, player1, player2, critic1=None, critic2=None, verbose=False):
        self.state = State()
        self.player1 = player1
        self.critic1 = critic1
        self.player2 = player2
        self.critic2 = critic2
        self.verbose = verbose
        self.aborted_games = 0
        self.reset()

    def reset(self):
        self.state.init()
        self.winner = None
        self.traj = {
            1: Trajectory(),
            -1: Trajectory()
        }

    def print(self, contents):
        if self.verbose:
            if not type(contents) in (list, tuple, range):
                print(contents)
            else:
                print(contents[:5])

    def generate_action(self, player):

        contestant = self.player1 if player == 1 else self.player2
        actions_list, probs_list, actions, probs = [], [], [], []

        # ----- Human player ----- #
        if contestant == 'human':
            move_invalid = True
            while move_invalid:
                move = input("Enter move as 'pos_init pos_final':")
                if move.lower() == 'forfeit':
                    self.winner = -player
                elif move == 'draw':
                    self.winner = 'draw'
                else:
                    positions = tuple([int(x) for x in move.split(' ')])
                    if len(positions) != 2 or not all([x in range(32) for x in positions]):
                        self.print("Invalid move. Try again")
                        continue
                    actions_list.append(positions)
                    probs_list.append(1.0)
                    move_invalid = False

        # ----- Policy player ----- #
        elif isinstance(contestant, Policy):
            board_state = self.state.get_state_view(view=player)
            feed_dict = {
                contestant.state: board_state.reshape(1, 8, 4).astype(np.int32),
            }
            probs, actions = contestant.session.run(fetches=[contestant.probs, contestant.actions],
                                                    feed_dict=feed_dict)
            probs, actions = probs[0, :SELECT_N], actions[0, :SELECT_N]
            order = np.arange(0, SELECT_N)
            if contestant.selection == 'eps-greedy':
                if np.random.random() <= SELECT_EPS:
                    order = np.random.choice(order,
                                             size=SELECT_N,
                                             replace=False,
                                             p=(probs + 1e-6) / np.sum(probs + 1e-6))
            elif contestant.selection == 'multinomial':
                order = np.random.choice(order,
                                         size=SELECT_N,
                                         replace=False,
                                         p=(probs + 1e-6) / np.sum(probs + 1e-6))
            for i, (prob, action) in enumerate(zip(probs[order], actions[order])):
                move = np.zeros((128,))
                action = action if player == 1 else self.state.transform_action(action)
                move[action] = 1
                pos_init, pos_final, move_type = self.state.action2positions(move, player=player)
                if pos_init and pos_final:
                    actions_list.append((pos_init, pos_final))
                    probs_list.append(prob)

        # ----- MCTS player ----- #
        elif isinstance(contestant, MCTS):
            # Todo: Implement
            pass

        return actions_list, probs_list, actions, probs

    def status_check(self, move_count):
        n_p1 = len(np.argwhere(self.state.get_state_view() > EMPTY))
        n_p2 = len(np.argwhere(self.state.get_state_view() < EMPTY))
        piece_differential = n_p1 - n_p2
        if n_p1 == 0:
            self.winner = -1
        elif n_p2 == 0:
            self.winner = 1
        elif move_count >= 40 and \
                piece_differential > 2:
            self.winner = 1
        elif move_count >= 40 and \
                piece_differential < -2:
            self.winner = -1
        elif move_count >= 50 and \
                piece_differential > 3:
            self.winner = 1
        elif move_count >= 50 and \
                piece_differential < -3:
            self.winner = -1
        elif move_count >= 60:
            if piece_differential > 0:
                self.winner = 1
            elif piece_differential < 0:
                self.winner = -1
            else:
                self.winner = 'draw'

    def play(self):

        self.reset()
        player = 1
        move_count = 1
        abort = False

        self.print('======================================================================')
        self.print('Checkers.ai')
        self.print('Author: Chris Larson')
        self.print('Player1: {0}'.format(self.player1))
        self.print('Player2: {0}'.format(self.player2))
        self.print("To forfeit the match, type 'forfeit' when prompted for a move")

        while not any((abort, self.winner)):

            self.print('==================================================================')
            self.print('Move: {}'.format(move_count))
            self.print("Turn: player{}\n".format(1 if player == 1 else 2))
            self.state.print(self.verbose)

            move_available = True
            valid_jumps = self.state.valid_jumps[player]

            while move_available:

                actions_list, probs_list, actions, probs = self.generate_action(player=player)
                if not actions_list and self.winner:
                    break
                else:
                    self.traj[player].update(state=self.state.get_state_view(view=player),
                                             softmax=probs)

                if any((player == 1 and self.player1 == 'human',
                        player == -1 and self.player2 == 'human')) \
                    and len(actions_list[0]) != 2:
                        self.print("Invalid move. Try again")
                        continue

                if valid_jumps:
                    self.print("Available jumps: {}".format(valid_jumps))
                    intersection = list(set(actions_list).intersection(valid_jumps))
                    if intersection:
                        idx = np.argsort([probs_list[actions_list.index(x)] for x in intersection])[::-1]
                        pos_init, pos_final = intersection[idx[0]]
                    else:
                        self.print("{} did not select one of the available jumps, jump forced.".format(
                            "Player1" if player == 1 else "Player2")
                        )
                        idx = np.random.choice(range(len(valid_jumps)), size=1)[0]
                        pos_init, pos_final = valid_jumps[idx]
                    try:
                        self.state.update(pos_init=pos_init,
                                          pos_final=pos_final,
                                          player=player,
                                          move_type='jump')
                    except NotImplementedError:
                        abort = True
                        break
                    valid_jumps = [x for x in self.state.valid_jumps[player] if x[0] == pos_final]
                    if not valid_jumps:
                        move_available = False

                else:
                    move_implemented = False
                    for i, (pos_init, pos_final) in enumerate(actions_list):
                        try:
                            self.state.update(pos_init=pos_init,
                                              pos_final=pos_final,
                                              player=player,
                                              move_type='standard')
                        except NotImplementedError:
                            if any((player == 1 and self.player1 == 'human',
                                    player == -1 and self.player2 == 'human')):
                                self.print("Invalid move. Try again")
                                break
                            else:
                                continue
                        else:
                            move_implemented = True
                            self.print('Move: [%s, %s]' % (pos_init, pos_final))
                            break
                    if not move_implemented and any((player == 1 and type(self.player1) in (Policy, MCTS),
                                                     player == -1 and type(self.player2) in (Policy, MCTS))):
                        self.print("This code is intended to iterate over all moves, state-update NotImplemented")
                        abort = True
                        break
                    elif not move_implemented:
                        continue
                    else:
                        move_available = False

            # Game status
            self.status_check(move_count)
            if abort:
                self.winner = 'null'
                self.aborted_games += 1
            move_count += 1
            player *= -1

        # Print out game stats
        self.print('Ending board:')
        end_board = self.state.get_state_view()
        self.print(end_board.reshape(8, 4))
        n_p1_chkr = len(np.argwhere(end_board == P1_CHKR))
        n_p1_king = len(np.argwhere(end_board == P1_KING))
        n_p2_chkr = len(np.argwhere(end_board == P2_CHKR))
        n_p2_king = len(np.argwhere(end_board == P2_KING))

        if self.winner == 'draw':
            self.print('The game ended in a draw.')
        else:
            self.print('%s wins' % self.player1 if self.winner == 1 else self.player2)
        self.print('Total number of moves: %d' % move_count)
        self.print('Remaining P1 pieces: (checkers: %d, kings: %d)' % (n_p1_chkr, n_p1_king))
        self.print('Remaining P2 pieces: (checkers: %d, kings: %d)' % (n_p2_chkr, n_p2_king))


        if self.winner == 1:
            self.traj[1].update(terminal_rwd=REWARD_WIN)
            self.traj[-1].update(terminal_rwd=REWARD_LOSS)
        if self.winner == -1:
            self.traj[-1].update(terminal_rwd=REWARD_WIN)
            self.traj[1].update(terminal_rwd=REWARD_LOSS)
        elif self.winner == 'draw':
            self.traj[1].update(terminal_rwd=REWARD_DRAW)
            self.traj[-1].update(terminal_rwd=REWARD_DRAW)
        elif self.winner == 'null':
            pass

Engine performance versus humans

I showed above that this model can learn individual moves, but the more interesting question is whether or not it exhibits the high-level strategy that is required to win games. Below are tabulated results from four games against human opponents. One of the players was a self-described “competent, but not a master” checkers player and the CNN engine won the game easily, maintaining a clear advantage throughout the game. The second individual plays at a fairly high level (this person beats the aforementioned human player), and was also defeated by the CNN, but after a more lengthy game of 100 moves. The other two human players play at a fairly advanced level, and were able to overcome the CNN, but not easily. Importantly, the CNN maintained sound play throughout the early- and mid-game.

Versus Humans

One of the interesting findings was that the CNN engine does have the ability to generalize and play sound checkers on board states it has never seen before. An example of this was the endgame between the CNN and the the intermediate level human. The game­-ending sequence is shown below. On move 95, the CNN had a clear opportunity to defend it’s own checker by moving from position 23 to 27, but instead moved in the opposite direction, sacrificing its own checker to allow the opposing king to move into a very unfavorable position (from the human’s perspective). The CNN engine then performed the perfect line of play given the human’s subsequent move into kings row, and was able secure the win one move later.

The two games against the more advanced human players were recorded and analyzed by the Cake checkers engine’s analysis tool in order to track the CNN’s performance throughout the game. The CNN lost both games but displayed a similar pattern of performance. For the first portion of these games, the engine had clear winning positions through the first 25 moves (e.g., either by clear advantage based on number of kings and pieces, or as determined by the Cake engine analysis in situations where piece counts were closer). However, as boards became more sparse, the engine became prone to game­-changing blunders, which ultimately led to its defeat.

Engine performance versus Checkers Deluxe and Cake

As a more concrete measure of performance, I show the tabulated results of our model versus Checkers Deluxe, which is a popular Checkers­ playing app. The app has four skill levels–as can be seen, the CNN defeated all levels except for Expert. Even though it couldn’t defeat the Expert level, it performed remakably well for 90+ moves before losing on the 97th move.

Versus Checkers Deluxe Engine

The CNN also lost twice to the the world championship,­ search­-based checkers engine Cake. This engine uses advanced search algorithms, along with endgame databases, to generate moves. Desipite the clear disadvantage of having no search, it maintained reasonable positions for 20+ moves in each game. According to the Cake analysis tool, the CNN’s first 5­7 outputs were book moves, which makes sense given that those would be the most common moves in the training database. The figure below shows a snapshot of the first game after 20 moves, shortly before Cake (white) gained a winning position.

CNN's position (gold pices) after 20 moves vs. the Cake Engine

Game play The source code for training and gameplay can be found here. Some day I hope to to wrap the game engine in a GUI–for now it just runs on the shell or your favorite Python ide. Feel free to download the source code and see if you can beat it. Tip: run a browser-based checkers game in 2-player mode along side this game for a better experience. The shell interface looks like this …

Final thoughts

The above results indicate that intermediate-level checkers play can be achieved without the use of search, which is pretty cool in its own right. While it typically maintains a clear advantage through 20+ moves, it tends to suffer from the occasional blunder in the mid-late game against stronger opponents as the board becomes sparse and less familar. This is expected, and motives the further learning through self play. In the next post I’ll describe how we can factor checkers games into a set of conditional probability distributions that provides an elegant means to improve our pretrained policy by playing games, observing outcomes, assigning rewards, and propogating reward signals back through the network to make moves that positively correlate with winning more probable in the policy.