Who loves restrictions? Even they might provide some kind of speed-ups or ensures the simplicity of algorithms. I guess the answer will be, most of the 'reasonable' people. But I can't say that I am one of them when I want to get some fun so I'll go forward and try to bring backtracking to a 3rd party simple push-down stateful LL(1) python parser.

Disclaimer: this post is written with no actual intent of production-ready code, just an experiment about can we do or not as a fun project.

What is 'Backtracking'?

The simplest (but definitely not to most formal) way of I guess describing this concept is doing an analogy with database transitions and rollbacks. During a parse operation; when encountered with multiple transitions for a single token and the current state is suitable for moving forward on more than one transitions that the input produced, every alternative path is going to be represented as a transaction. When starting to process an alternative, we will save the current state and locals (like the input position, since we are going to eat tokens to determine if we are on the right path or not) and then if we fail, we'll rollback to the state we have saved. If we succeed, we'll move on.

Multiple States from a 'Single Token'

If you have worked with a language that the identifiers have the same form as other language structures, there is a concept of 'keywords'. Let's imagine the 'else' keyword, in theory, it suits the condition of a normal identifier and it would be possible for you to assign it, but for state machines that need to determine routes based on the tokens, instead of checking out the value of every different token, they use the value of only identifier tokens as different states on the transition table when it is reserved as a string on the grammar.

def token_to_transition(parser, token):
    if token in parser.reserved_strings:
        return token.value
    else:
        return token.type

But you might ask that there is still only one transition that is returned from that function even the input was a keyword, and that means there are no alternatives. You are right. With this kind of implementation, you can not use reserved strings (a.k.a keywords) on other occassions even where the type of token (NAME) suits. You can only use them at places where you specified as 'else' on the raw grammar. At least without 'soft keywords'.

Soft Keywords

Uhm, another different concept that got into our lives. I initially saw this one in a commit to CPython and seen the first real example of it when reading the PEP 622. It is a proposal about bringing a 'pattern matching statement' to the Python. Since introducing a new statement should be handled with a keyword, they choose to use 'match' and 'case'. But reserving another name during a new python version will break a lot of code, a lot. It will even break the use cases in the stdlib (e.g: re.match()), of course, that authors considered that too and they made these 2 keywords 'soft'. The keywords act like normal keywords on the contexts and on other occasions they act like a usual 'NAME' token. For doing that when there is some kind of ambiguity, they try all alternatives and backtrack. Let's draft a token_to_transition function with supporting 'soft' keywords. Now the 'token_to_transition' function will return either a 1-element tuple or a 2-element tuple depending on the code-path.

def token_to_transition(parser, token):
    if token in parser.soft_keywords:
        return token.value, token.type
    elif token in parser.reserved_strings:
        return token.value,
    else:
        return token.type,

So, if you make 'import' a soft keyword, then when the parser is in a state that expects either a 'NAME' or an 'import' keyword, it will try both and see which one of the path will be successfully continues.

Let's Implement

So, what we first need is determining some kind of strategy about whether if we are going to consume all tokens beforehand and fill the memory, or not. As you might remember from my previous post that, it is not uncommon for parsers to take lazy lexers as their producer and taking input one by one as they proceed. They also don't store the results from previously produced tokens. But when we're going to do backtracking, we need to consume 'enough' tokens beforehand and need to also rollback when we fail to proceed that alternative.

Since, consuming all the tokens beforehand wouldn't be as pleasant as we used to in times before backtracking, I plan to operate in the best way I can. And it is going to be operating as usual when there are no ambiguity or place to backtrack, and when there is we are going to consume as little as we can and as soon as we are clear, we'll go back to the old way and output one by one.

     def parse(self, tokens):
         first_dfa = self._pgen_grammar.nonterminal_to_dfas[self._start_nonterminal][0]
         self.stack = Stack([StackNode(first_dfa)])
+        self._tokens = _TokenGeneratorProxy(tokens)

-        for token in tokens:
+        for token in self._tokens:
             self._add_token(token)

Let's try to draft a _TokenGeneratorProxy class;

 class _TokenGeneratorProxy:
    def __init__(self, generator):
        self._tokens = generator
        self._counter = 0

    def __iter__(self):
        return self

    def __next__(self):
        token = next(self._tokens)
        self._counter += 1
        return token

We just implemented an iterator protocol with __iter__ (which returns the iterator) and the __next__ (the function that is called on every iteration). For now, we're just keeping a counter and returning the value we get from the original producer. But what we need is a way of looking ahead without breaking the whole mechanism. What I have in mind is something like this;

def process(self, token):
    ...
    self.eat_and_print_next_token()

    with self._tokens.release() as proxy:
        for needed in range(2):
            proxy.eat(needed)
        print('end of for loop')
        proxy.eat(0)

    self.eat_and_print_next_token()
    self.eat_and_print_next_token()
    self.eat_and_print_next_token()

Which in theory should work like this;

eating 'NAME' as the position '0'

releasing the producer

eating 'OPERATOR' as the position '0'
eating 'NAME' as the position '1'

end of for loop

using cached token 'OPERATOR' from the position '0'

locking the producer

using cached token 'OPERATOR' from the position '1'
using cached token 'NAME' from the position '2'
eating 'NEWLINE' as the position '3'

It might look a little bit complex, but if I have to explain I would say that, without any 'release' operation, the proxy is going to just consume from the producer and output without storing any kind of token. When there is a release action, it will lock a point (the counter), and record all the tokens that it will consume until it is locked again (exit of the context manager). When locked, the proxy.eat will eat in a relative range, so if you have eaten 1 token before (index 0) the lock operation, and try to perform proxy.eat(0), it will eat the token in the position of index 1. Also, when you performed proxy.eat(0) after a proxy.eat(0) it will return the same thing (if performed with-in the same point). Also, it doesn't matter how far you have eaten in a lock, it will be reset to the point of the lock after the re-locking operation and for the ones you have already consumed, they will come up from the cache.

@contextmanager
def release(self):
    self._release_ranges.append([self._counter, None, []])
    try:
        yield self
    finally:
        # Lock the last release range to the final position that
        # has been eaten.
        total_eaten = len(self._release_ranges[-1][2])
        self._release_ranges[-1][1] = self._counter + total_eaten

The _release_ranges is a list of 3-item length records. The first one points the state that we started eating tokens, the second one points the end (which is set after the re-locking operation), and the third one is another list that consists from the tokens that are eaten on that range.

def eat(self, point):
    eaten_tokens = self._release_ranges[-1][2]
    if point < len(eaten_tokens):
        return eaten_tokens[point]
    else:
        while point >= len(eaten_tokens):
            token = next(self._tokens)
            eaten_tokens.append(token)
        return token

The eaten_tokens (the last element of the record) is going to be ordered list so the index will point to the actual token that lays in self._counter + point. It will also work if you try to jump.

 def __next__(self):
    # If the current position is already compromised (looked up)
    # return the eaten token, if not just go further on the given
    # token producer.
    for start, end, tokens in self._release_ranges:
        assert end is not None
        if start <= self._counter < end:
            token = tokens[self._counter - start]
            break
    else:
        token = next(self._tokens)
    self._counter += 1
    return token

And finally, we need an extra for loop to go through all release records and try to match the current point and if found, just use it. If not, we'll allocate a new token.

def can_advance(self, to):
    # Try to eat, fail if it can't. The eat operation is cached
    # so there wont be any additional cost of eating here
    try:
        self.eat(to)
    except StopIteration:
        return False
    else:
        return True

Since we already caching the results of eat, it is going to be much easier to implement a can_advance functionality which will just try to go to the selected pointed, if the producer can produce no more, it will just return False. Otherwise will result with a True.

Final Quest: Implementing the real backtracking

Since we already created the way of eating / caching tokens from a lazy producer, we can safely go forward and modify the parser. As a short summary of the mechanism;

> parse
    for token in tokens:
        add_token(token)
> add_token
    transition = token_to_transition(token)
    while True:
        try:
            plan = stack[-1].dfa.transitions[transition]
            break
        except KeyError:
            if stack[-1].dfa.is_final:
                self._pop()
            else:
                self.error_recovery(token)
                return
    stack[-1].dfa = plan.next_dfa

As you might be seen, it basically converts the input to a transition then finds the next state by checking out the transition table of the last state. When it founds the next transition, it sets it and continues.

Let's continue step by step. The first thing is that, we changed the token_to_transition function to output multiple transitions, so we need to change there to.

-        transition = _token_to_transition(grammar, type_, value)
+        possible_transitions = _token_to_transition(grammar, type_, value)

Also, it is going be too much for us to fit the functionality of finding the next state into a try/except, so we're just going to split out too.

while True:
    try:
        plan = get_possible_plan(possible_transitions, stack[-1].dfa.transitions)
        if plan is None:
            raise KeyError
        break
    except KeyError:
        ... # pop / error_recovery

The get_possible_plan is going to return None, only if there is nowhere to proceed with the given transition. For just preserving the old habit of raising KeyError, we're just going to raise that KeyError (which was used to raised from accessing the transition table).

Let's move forward with our precious get_possible_plan function which will eventually have some kind of backtracking.

def get_possible_plan(possible_transitions, transition_table):
    possible_plans = []
    for transition in possible_transitions:
        possible_plan = transition_table.get(transition)
        if possible_plan is not None:
            possible_plans.append(possible_plan)

The function will get the possible_transitions, and a transition_table to check them against. It will create a temporary list that is going to contain all possible routes with given states. Then we'll go through all the states and try to find a place for them in the transition table.

if len(possible_plans) == 0:
    return None
elif len(possible_plans) == 1:
    return possible_plans[0]

If there is nothing inside of possible_plans, we're just going to return None (which as I mentioned before, is going to equal of "we are done here"). If there is only one possible route, it is so nice, we can just return it and exit without any trouble. But, what if there are more then one possible plan with the current transitions. It is where, backtracking comes in.

dead_plans = set()
possible_plan_table = {possible_plan: [possible_plan] for possible_plan in possible_plans}

We are going to keep 2 different tables. One is going to be a set of dead_plans, and the other one is going to be a mapping of the possible routes and how far they can proceed. Like if we have 2 possible routes and one can eat 3 token more and then fail, and if the other one can eat 4 and fail, we'll go for the latter.

with self._tokens.release() as token_proxy:
    counter = 0

    while self._superior_plan(possible_plan_table) is None:
        if not token_proxy.can_advance(counter):
            break # nothing to do, get the best plan we have
        token = token_proxy.eat(counter)
        next_transitions = _token_to_transition(grammar, *token[:2])

So we are always going to check out if there are any superior plans with self._superior_plan, and if we found that all plans except one is dead, we're just going to choose the one and stop consuming the token proxy and re-lock the current state. If we can't go further, we are just going to stop and return the first plan (which I don't expect will happen ever).

for origin, possible_plans in possible_plan_table.items():
    current_plan = possible_plans[-1]
    if origin in dead_plans:
        continue

    for dfa_push in reversed(current_plan.dfa_pushes):
        next_possible_plan = get_possible_plan(next_transitions, dfa_push.transitions)
        if next_possible_plan is not None:
            break
    else:
        dead_plans.add(origin)
        continue
    possible_plans.append(next_possible_plan)

Here it comes the real backtracking. We are moving on all possible plans, and finding the current state which holds the transition table (current_plan). After that, we check if this plan is a dead-end or no. If it is, we are just going to pass the backtracking step and continue on the next plan.

The for loop below that part is iterating over all the dfa pushes, which is actually the rules that it has entered. If we wanted to see what happens when we get import of the import x, the pushes are things like these;

[
    <DFAState: stmt is_final=True>,
    <DFAState: simple_stmt is_final=False>,
    <DFAState: small_stmt is_final=True>,
    <DFAState: import_stmt is_final=True>,
    <DFAState: import_name is_final=False>
]

The reason we iterate a reversed version of it is finding a place where we can proceed. When we got a push that actually produces results with our current strategy, we choose to go with it and exit from the loop. If we can't find anything that actually works, we are just going to add this plan's root to the dead_plans set and won't ever try to progress on it.

And finally, if we found either a supeior plan or consumed all the input (which I expect not to happen, unless some kind of invalid input) we're just going to exit with the supeior plan;

return self._superior_plan(possible_plan_table, winner = True)

This was it, I really enjoyed when I was developing this since it sounded like a really funny thing to experiment with in the beginning. By the way if you have similar thoughts or new ideas about this subject, just let me know at twitter/@isidentical, I really would like to hear from you.

Share on: TwitterFacebookEmail


Published

Category

Parsers

Tags

Contact