Who loves restrictions? Even they might provide some kind of speed-ups or ensures the simplicity of algorithms. I guess the answer will be, most of the 'reasonable' people. But I can't say that I am one of them when I want to get some fun so I'll go forward and try to bring backtracking to a 3rd party simple push-down stateful LL(1) python parser.
Disclaimer: this post is written with no actual intent of production-ready code, just an experiment about can we do or not as a fun project.
What is 'Backtracking'?
The simplest (but definitely not to most formal) way of I guess describing this concept is doing an analogy with database transitions and rollbacks. During a parse operation; when encountered with multiple transitions for a single token and the current state is suitable for moving forward on more than one transitions that the input produced, every alternative path is going to be represented as a transaction. When starting to process an alternative, we will save the current state and locals (like the input position, since we are going to eat tokens to determine if we are on the right path or not) and then if we fail, we'll rollback to the state we have saved. If we succeed, we'll move on.
Multiple States from a 'Single Token'
If you have worked with a language that the identifiers have the same form as other language structures, there is a concept of 'keywords'. Let's imagine the 'else' keyword, in theory, it suits the condition of a normal identifier and it would be possible for you to assign it, but for state machines that need to determine routes based on the tokens, instead of checking out the value of every different token, they use the value of only identifier tokens as different states on the transition table when it is reserved as a string on the grammar.
def token_to_transition(parser, token):
if token in parser.reserved_strings:
return token.value
else:
return token.type
But you might ask that there is still only one transition that is returned from that function even
the input was a keyword, and that means there are no alternatives. You are right. With this
kind of implementation, you can not use reserved strings (a.k.a keywords) on other occassions even
where the type of token (NAME
) suits. You can only use them at places where you specified as
'else'
on the raw grammar. At least without 'soft keywords'.
Soft Keywords
Uhm, another different concept that got into our lives. I initially saw this one in a commit
to CPython and seen the first real example of it when reading the PEP 622. It is a proposal
about bringing a 'pattern matching statement' to the Python. Since introducing a new statement
should be handled with a keyword, they choose to use 'match' and 'case'. But reserving another
name during a new python version will break a lot of code, a lot. It will even break the use
cases in the stdlib (e.g: re.match()
), of course, that authors considered that too and they
made these 2 keywords 'soft'. The keywords act like normal keywords on the contexts and on other
occasions they act like a usual 'NAME' token. For doing that when there is some kind of ambiguity,
they try all alternatives and backtrack. Let's draft a token_to_transition
function with supporting
'soft' keywords. Now the 'token_to_transition
' function will return either a 1-element tuple or
a 2-element tuple depending on the code-path.
def token_to_transition(parser, token):
if token in parser.soft_keywords:
return token.value, token.type
elif token in parser.reserved_strings:
return token.value,
else:
return token.type,
So, if you make 'import' a soft keyword, then when the parser is in a state that expects either a 'NAME' or an 'import' keyword, it will try both and see which one of the path will be successfully continues.
Let's Implement
So, what we first need is determining some kind of strategy about whether if we are going to consume all tokens beforehand and fill the memory, or not. As you might remember from my previous post that, it is not uncommon for parsers to take lazy lexers as their producer and taking input one by one as they proceed. They also don't store the results from previously produced tokens. But when we're going to do backtracking, we need to consume 'enough' tokens beforehand and need to also rollback when we fail to proceed that alternative.
Since, consuming all the tokens beforehand wouldn't be as pleasant as we used to in times before backtracking, I plan to operate in the best way I can. And it is going to be operating as usual when there are no ambiguity or place to backtrack, and when there is we are going to consume as little as we can and as soon as we are clear, we'll go back to the old way and output one by one.
def parse(self, tokens):
first_dfa = self._pgen_grammar.nonterminal_to_dfas[self._start_nonterminal][0]
self.stack = Stack([StackNode(first_dfa)])
+ self._tokens = _TokenGeneratorProxy(tokens)
- for token in tokens:
+ for token in self._tokens:
self._add_token(token)
Let's try to draft a _TokenGeneratorProxy
class;
class _TokenGeneratorProxy:
def __init__(self, generator):
self._tokens = generator
self._counter = 0
def __iter__(self):
return self
def __next__(self):
token = next(self._tokens)
self._counter += 1
return token
We just implemented an iterator protocol with __iter__
(which returns the iterator)
and the __next__
(the function that is called on every iteration). For now, we're just
keeping a counter and returning the value we get from the original producer. But what we
need is a way of looking ahead without breaking the whole mechanism. What I have in mind
is something like this;
def process(self, token):
...
self.eat_and_print_next_token()
with self._tokens.release() as proxy:
for needed in range(2):
proxy.eat(needed)
print('end of for loop')
proxy.eat(0)
self.eat_and_print_next_token()
self.eat_and_print_next_token()
self.eat_and_print_next_token()
Which in theory should work like this;
eating 'NAME' as the position '0'
releasing the producer
eating 'OPERATOR' as the position '0'
eating 'NAME' as the position '1'
end of for loop
using cached token 'OPERATOR' from the position '0'
locking the producer
using cached token 'OPERATOR' from the position '1'
using cached token 'NAME' from the position '2'
eating 'NEWLINE' as the position '3'
It might look a little bit complex, but if I have to explain I would say
that, without any 'release' operation, the proxy is going to just consume
from the producer and output without storing any kind of token. When there
is a release action, it will lock a point (the counter), and record all
the tokens that it will consume until it is locked again (exit of the
context manager). When locked, the proxy.eat
will eat in a relative
range, so if you have eaten 1 token before (index 0) the lock operation, and
try to perform proxy.eat(0)
, it will eat the token in the position of index 1.
Also, when you performed proxy.eat(0)
after a proxy.eat(0)
it will return
the same thing (if performed with-in the same point). Also, it doesn't matter
how far you have eaten in a lock, it will be reset to the point of the lock
after the re-locking operation and for the ones you have already consumed, they
will come up from the cache.
@contextmanager
def release(self):
self._release_ranges.append([self._counter, None, []])
try:
yield self
finally:
# Lock the last release range to the final position that
# has been eaten.
total_eaten = len(self._release_ranges[-1][2])
self._release_ranges[-1][1] = self._counter + total_eaten
The _release_ranges
is a list of 3-item length records. The first one points
the state that we started eating tokens, the second one points the end (which
is set after the re-locking operation), and the third one is another list that
consists from the tokens that are eaten on that range.
def eat(self, point):
eaten_tokens = self._release_ranges[-1][2]
if point < len(eaten_tokens):
return eaten_tokens[point]
else:
while point >= len(eaten_tokens):
token = next(self._tokens)
eaten_tokens.append(token)
return token
The eaten_tokens
(the last element of the record) is going to be ordered list
so the index will point to the actual token that lays in self._counter + point
.
It will also work if you try to jump.
def __next__(self):
# If the current position is already compromised (looked up)
# return the eaten token, if not just go further on the given
# token producer.
for start, end, tokens in self._release_ranges:
assert end is not None
if start <= self._counter < end:
token = tokens[self._counter - start]
break
else:
token = next(self._tokens)
self._counter += 1
return token
And finally, we need an extra for loop to go through all release records and try to match the current point and if found, just use it. If not, we'll allocate a new token.
def can_advance(self, to):
# Try to eat, fail if it can't. The eat operation is cached
# so there wont be any additional cost of eating here
try:
self.eat(to)
except StopIteration:
return False
else:
return True
Since we already caching the results of eat
, it is going to be much
easier to implement a can_advance
functionality which will just try
to go to the selected pointed, if the producer can produce no more, it
will just return False
. Otherwise will result with a True
.
Final Quest: Implementing the real backtracking
Since we already created the way of eating / caching tokens from a lazy producer, we can safely go forward and modify the parser. As a short summary of the mechanism;
> parse
for token in tokens:
add_token(token)
> add_token
transition = token_to_transition(token)
while True:
try:
plan = stack[-1].dfa.transitions[transition]
break
except KeyError:
if stack[-1].dfa.is_final:
self._pop()
else:
self.error_recovery(token)
return
stack[-1].dfa = plan.next_dfa
As you might be seen, it basically converts the input to a transition then finds the next state by checking out the transition table of the last state. When it founds the next transition, it sets it and continues.
Let's continue step by step. The first thing is that, we changed the
token_to_transition
function to output multiple transitions, so we
need to change there to.
- transition = _token_to_transition(grammar, type_, value)
+ possible_transitions = _token_to_transition(grammar, type_, value)
Also, it is going be too much for us to fit the functionality of finding the
next state into a try/except
, so we're just going to split out too.
while True:
try:
plan = get_possible_plan(possible_transitions, stack[-1].dfa.transitions)
if plan is None:
raise KeyError
break
except KeyError:
... # pop / error_recovery
The get_possible_plan
is going to return None
, only if there is nowhere to proceed
with the given transition. For just preserving the old habit of raising KeyError
, we're
just going to raise that KeyError
(which was used to raised from accessing the transition
table).
Let's move forward with our precious get_possible_plan
function which will eventually
have some kind of backtracking.
def get_possible_plan(possible_transitions, transition_table):
possible_plans = []
for transition in possible_transitions:
possible_plan = transition_table.get(transition)
if possible_plan is not None:
possible_plans.append(possible_plan)
The function will get the possible_transitions
, and a transition_table
to check
them against. It will create a temporary list that is going to contain all possible
routes with given states. Then we'll go through all the states and try to find a place
for them in the transition table.
if len(possible_plans) == 0:
return None
elif len(possible_plans) == 1:
return possible_plans[0]
If there is nothing inside of possible_plans
, we're just going to return None
(which
as I mentioned before, is going to equal of "we are done here"). If there is only one possible
route, it is so nice, we can just return it and exit without any trouble. But, what if there
are more then one possible plan with the current transitions. It is where, backtracking comes
in.
dead_plans = set()
possible_plan_table = {possible_plan: [possible_plan] for possible_plan in possible_plans}
We are going to keep 2 different tables. One is going to be a set of dead_plans
, and the other one is going
to be a mapping of the possible routes and how far they can proceed. Like if we have 2 possible routes and one
can eat 3 token more and then fail, and if the other one can eat 4 and fail, we'll go for the latter.
with self._tokens.release() as token_proxy:
counter = 0
while self._superior_plan(possible_plan_table) is None:
if not token_proxy.can_advance(counter):
break # nothing to do, get the best plan we have
token = token_proxy.eat(counter)
next_transitions = _token_to_transition(grammar, *token[:2])
So we are always going to check out if there are any superior plans with self._superior_plan
, and if we found
that all plans except one is dead, we're just going to choose the one and stop consuming the token proxy and
re-lock the current state. If we can't go further, we are just going to stop and return the first plan (which
I don't expect will happen ever).
for origin, possible_plans in possible_plan_table.items():
current_plan = possible_plans[-1]
if origin in dead_plans:
continue
for dfa_push in reversed(current_plan.dfa_pushes):
next_possible_plan = get_possible_plan(next_transitions, dfa_push.transitions)
if next_possible_plan is not None:
break
else:
dead_plans.add(origin)
continue
possible_plans.append(next_possible_plan)
Here it comes the real backtracking. We are moving on all possible plans, and finding the current state
which holds the transition table (current_plan
). After that, we check if this plan is a dead-end or no.
If it is, we are just going to pass the backtracking step and continue on the next plan.
The for loop below that part is iterating over all the dfa pushes, which is actually the rules that it
has entered. If we wanted to see what happens when we get import
of the import x
, the pushes are
things like these;
[
<DFAState: stmt is_final=True>,
<DFAState: simple_stmt is_final=False>,
<DFAState: small_stmt is_final=True>,
<DFAState: import_stmt is_final=True>,
<DFAState: import_name is_final=False>
]
The reason we iterate a reversed version of it is finding a place where we can proceed. When we got a push
that actually produces results with our current strategy, we choose to go with it and exit from the loop.
If we can't find anything that actually works, we are just going to add this plan's root to the dead_plans
set and won't ever try to progress on it.
And finally, if we found either a supeior plan or consumed all the input (which I expect not to happen, unless some kind of invalid input) we're just going to exit with the supeior plan;
return self._superior_plan(possible_plan_table, winner = True)
This was it, I really enjoyed when I was developing this since it sounded like a really funny thing to experiment with in the beginning. By the way if you have similar thoughts or new ideas about this subject, just let me know at twitter/@isidentical, I really would like to hear from you.