"""
"""
from __future__ import annotations
import contextlib
import copy
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Union
import urwid
import lookatme.config
import lookatme.utils as utils
from lookatme.widgets.clickable_text import ClickableText
from lookatme.render.token_iterator import TokenIterator
[docs]@dataclass
class ContainerInfo:
container: urwid.Widget
meta: Dict[str, Any]
source_token: Optional[Dict]
[docs]class Context:
""" """
def __init__(self, loop: Optional[urwid.MainLoop]):
self.loop = loop
self.container_stack: List[ContainerInfo] = []
self.tag_stack: List[Tuple[str, Dict, bool]] = []
self.spec_stack = []
self.source_stack = []
self.inline_render_results = []
self.token_stack: List[TokenIterator] = []
self._unwind_tokens: List[Tuple[bool, Dict]] = []
self.validation_states = {}
self.level = 0
self.in_new_block = True
self._log = lookatme.config.get_log()
[docs] def clone(self) -> Context:
"""Create a new limited clone of the current context.
The only data that is actually cloned is the source and spec stacks.
"""
res = Context(self.loop)
res.source_stack = list(self.source_stack)
res.spec_stack = list(self.spec_stack)
res.token_stack = list(self.token_stack)
res.container_stack = list(self.container_stack)
return res
def _validate_empty_list(self, stack_name: str, stack: List[Any]):
if len(stack) == 0:
return
raise RuntimeError(
"The {} stack did not return to empty, {} items remaining".format(
stack_name, len(stack)
)
)
def _set_validation_state(self, name: str, container: Any):
self.validation_states[name] = (container, len(container))
[docs] def clean_state_snapshot(self):
self._set_validation_state("container", self.container_stack)
self._set_validation_state("tag", self.tag_stack)
self._set_validation_state("spec", self.spec_stack)
self._set_validation_state("source", self.source_stack)
self._set_validation_state("tokens", self.token_stack)
self._set_validation_state("inline render results", self.inline_render_results)
[docs] def clean_state_validate(self):
"""Validate that all stacks are empty, that everything unwound correctly"""
errors = []
for list_name, (container, expected) in self.validation_states.items():
if len(container) != expected:
errors.append(
"{} should have been {}, was {}".format(
list_name,
expected,
len(container),
)
)
if self.level != 0:
errors.append("Context level did not return to 0, is {}".format(self.level))
if errors:
raise RuntimeError(
"Context did not unwind correctly:\n\n{}".format(
"\n".join(" " + error for error in errors)
)
)
[docs] def fake_token(self, type: str, **kwargs) -> Dict:
res = {"type": type, **kwargs}
res["map"] = self.tokens.curr["map"]
self.tokens._handle_unwind(res)
return res
@property
def source(self) -> str:
"""Return the current markdown source"""
if not self.source_stack:
raise ValueError("No source has been set!")
return self.source_stack[-1]
[docs] def source_push(self, new_source: str):
"""Push new markdown source onto the source stack"""
self.source_stack.append(new_source)
[docs] def source_pop(self) -> str:
"""Pop the latest source off of the source stack"""
if not self.source_stack:
raise ValueError("Tried to pop off the source_stack one too many times")
return self.source_stack.pop()
[docs] def source_get_token_lines(
self,
token: Dict,
extra: int = 5,
with_lines: bool = True,
with_marker: bool = True,
) -> List[str]:
lines = self.source.split("\n")
range_start, range_end = token["map"]
before_extra = range_start - max(range_start - extra, 0)
after_extra = min(len(lines), range_end + extra) - range_end
source_lines = self.source.split("\n")[
range_start - before_extra : range_end + after_extra
]
if with_lines:
max_line_len = len(str(range_end + extra))
for idx, line in enumerate(source_lines):
source_lines[idx] = "{{line_no:{}}} {{line}}".format(
max_line_len
).format(
line_no=range_start - before_extra + idx + 1,
line=line,
)
if with_marker:
for idx, line in enumerate(source_lines):
if idx >= before_extra and idx < len(source_lines) - after_extra:
source_lines[idx] = line = " HERE-> " + line
else:
source_lines[idx] = line = " " + line
return source_lines
def _create_unwind_tokens(self, from_idx: int = 0) -> List[Dict]:
_, latest_map_end = self.tokens.curr["map"]
latest_map = [latest_map_end - 1, latest_map_end]
res = []
idx = from_idx
curr_inlines = []
while idx < len(self._unwind_tokens):
is_inline, token = self._unwind_tokens[idx]
idx += 1
token["map"] = latest_map
if is_inline:
curr_inlines.append(token)
continue
if curr_inlines:
res.append(
{
"type": "inline",
"map": latest_map,
"children": curr_inlines,
}
)
curr_inlines.clear()
res.append(token)
if curr_inlines:
res.append(
{
"type": "inline",
"map": latest_map,
"children": curr_inlines,
}
)
return list(reversed(res))
@property
def unwind_tokens(self) -> List[Dict]:
"""Generate a list of unwind (close) tokens from the token iterators
in the stack
"""
return self._create_unwind_tokens()
[docs] def unwind_tokens_from(self, bookmark: int) -> List[Dict]:
return self._create_unwind_tokens(bookmark)
@property
def unwind_bookmark(self) -> int:
return len(self._unwind_tokens)
@property
def unwind_tokens_consumed(self) -> List[Dict]:
"""Generate a list of unwind (close) tokens from the token iterators
in the stack
"""
res = self.unwind_tokens
self._unwind_tokens.clear()
return res
@property
def tokens(self) -> TokenIterator:
"""Return the current token iterator"""
if not self.token_stack:
raise ValueError("Attempted to fetch tokens without providing any")
return self.token_stack[-1]
[docs] def tokens_push(self, tokens: List[Dict], inline: bool = False):
""" """
self.token_stack.append(
TokenIterator(copy.deepcopy(tokens), self._unwind_tokens, inline)
)
[docs] def tokens_pop(self):
""" """
return self.token_stack.pop()
[docs] @contextlib.contextmanager
def use_tokens(self, tokens, inline: bool = False):
"""Create a context manager for pushing/popping tokens via a with block"""
self.tokens_push(tokens, inline)
yield
# do not pop tokens when an exception occurrs! We want to have full
# context when handlingn errors!
self.tokens_pop()
[docs] def ensure_new_block(self):
"""Ensure that we are in a new block"""
if not self.in_new_block:
utils.pile_or_listbox_add(self.container, self.inline_widgets_consumed)
self.widget_add(self.wrap_widget(urwid.Divider()))
self.in_new_block = True
[docs] def widget_add(
self, w: Union[List[urwid.Widget], urwid.Widget], wrap: Optional[bool] = False
):
"""Add the provided widget to the current container."""
if wrap:
if isinstance(w, (list, tuple)):
w = [self.wrap_widget(x) for x in w]
else:
w = self.wrap_widget(w)
self.in_new_block = False
utils.pile_or_listbox_add(self.container, w)
[docs] @contextlib.contextmanager
def level_inc(self):
""" """
self.level += 1
yield
# do not pop tokens when an exception occurrs! We want to have full
# context when handlingn errors!
self.level -= 1
[docs] def log_debug(self, msg):
indent = " " * self.level
self._log.debug(indent + msg)
[docs] def inline_push(
self,
inline_result: Union[urwid.Widget, str, Tuple[Optional[urwid.AttrSpec], str]],
):
"""Push a new inline render result onto the stack. Either a widget, or
text markup
"""
if isinstance(inline_result, str) and len(inline_result) == 0:
return
if isinstance(inline_result, tuple) and len(inline_result) == 2:
if len(inline_result[1]) == 0:
return
if inline_result[0] is None:
inline_result = inline_result[1]
self.in_new_block = False
self.inline_render_results.append(inline_result)
[docs] def inline_clear(self):
"""Clear the inline rendered results"""
self.inline_render_results.clear()
[docs] def get_inline_markup(self):
"""Return the current inline markup, ignoring any widgets that may
have made it in
"""
res = filter(
lambda x: not isinstance(x, urwid.Widget), self.inline_render_results
)
return list(res)
[docs] def wrap_widget(
self, w: urwid.Widget, spec: Optional[urwid.AttrSpec] = None
) -> urwid.Widget:
"""Wrap the provided widget with an AttrMap that will apply
the current styling to the entire widget (using spec_general)
"""
if spec is None:
spec = self.spec_general
return urwid.AttrMap(w, {None: spec})
[docs] def get_inline_widgets(self):
"""Return the results of any inline rendering that has occurred in
Widget form. The conversion here is necessary since inline rendering
functions often produce urwid Text Markup instead of widgets, which
need to be converted to ClickableText.
"""
res = []
curr_text_markup = []
for render_res in self.inline_render_results:
if isinstance(render_res, urwid.Widget):
if curr_text_markup:
res.append(ClickableText(curr_text_markup))
curr_text_markup = []
res.append(render_res)
if isinstance(render_res, str) or (
isinstance(render_res, (tuple, list)) and len(render_res) == 2
):
curr_text_markup.append(render_res)
if len(curr_text_markup) > 0:
res.append(ClickableText(curr_text_markup))
# res = [urwid.AttrMap(x, {None: self.spec_text}) for x in res]
return res
[docs] def inline_flush(self):
"""Add all inline widgets to the current container"""
self.widget_add(self.inline_widgets_consumed)
[docs] def inline_convert_all_to_widgets(self):
self.inline_render_results = self.inline_widgets_consumed
@property
def inline_markup_consumed(self):
"""Return and clear the inline markup"""
res = self.get_inline_markup()
self.inline_clear()
return res
@property
def inline_widgets_consumed(self):
"""Return and clear the inline widgets"""
res = self.get_inline_widgets()
self.inline_clear()
return res
@property
def is_literal(self):
return self.tag_is_ancestor("pre")
[docs] def tag_is_ancestor(self, ancestor_tag_name: str) -> bool:
for tag_name, _, _ in reversed(self.tag_stack):
if tag_name == ancestor_tag_name:
return True
return False
[docs] def tag_push(self, new_tag: str, token: Dict, spec=None, text_only_spec=False):
"""Push a new tag name onto the stack"""
if spec is not None:
self.spec_push(spec, text_only=text_only_spec)
self.tag_stack.append((new_tag, token, spec is not None))
[docs] def tag_pop(self):
"""Pop the most recent tag off of the tag stack"""
if not self.tag_stack:
raise ValueError("Tried to pop off the tag stack one too many times")
popped_tag, _, had_spec = self.tag_stack.pop()
if had_spec:
self.spec_pop()
return (popped_tag, had_spec)
@property
def tag(self):
if self.tag_stack:
return None
return self.tag_stack[-1][0]
@property
def tag_token(self) -> Dict:
if not self.tag_stack:
raise ValueError("Tried to get the token for a non-existent tag")
return self.tag_stack[-1][1]
@property
def meta(self) -> Dict[Any, Any]:
"""Return metadata associated with the current container, or None
if the current container is None.
"""
if not self.container_stack:
raise ValueError("Tried to get meta with no containers!")
return self.container_stack[-1].meta
[docs] def container_push(
self,
new_item: urwid.Widget,
is_new_block: bool,
custom_add: Optional[urwid.Widget] = None,
):
"""Push to the stack and propagate metadata"""
if custom_add is not None:
custom_add = self.wrap_widget(custom_add)
else:
new_item = self.wrap_widget(new_item)
new_meta = {}
if self.container_stack:
new_meta.update(self.meta)
self.inline_flush()
if custom_add is None:
utils.pile_or_listbox_add(self.container, new_item)
else:
utils.pile_or_listbox_add(self.container, custom_add)
elif self.inline_render_results:
raise Exception("How do you have render results with no containers?")
new_info = ContainerInfo(
container=new_item, meta=new_meta, source_token=self.tokens.curr
)
self.container_stack.append(new_info)
self.in_new_block = is_new_block
[docs] def container_pop(self) -> ContainerInfo:
"""Pop the last element off the stack. Returns the popped widget"""
if not self.container_stack:
raise ValueError("Tried to pop off the widget stack one too many times")
self.inline_flush()
return self.container_stack.pop()
@property
def container(self) -> urwid.Widget:
"""Return the current container"""
if not self.container_stack:
raise ValueError("Tried to get container with no containers")
return self.container_stack[-1].container
# @property
# def container_last(self):
# if self.container is None:
# return None
#
# cont_children = []
# if hasattr(self.container, "body"):
# cont_children = self.container.body
# elif hasattr(self.container, "contents"):
# cont_children = self.container.contents
#
# if len(cont_children) == 0:
# return None
# return cont_children[-1]
[docs] @contextlib.contextmanager
def use_container(
self,
new_container: urwid.Widget,
is_new_block: bool,
custom_add: Optional[urwid.Widget] = None,
):
"""Ensure that the container is pushed/popped correctly"""
self.container_push(new_container, is_new_block, custom_add)
yield
# do not pop tokens when an exception occurrs! We want to have full
# context when handlingn errors!
self.container_pop()
[docs] @contextlib.contextmanager
def use_container_tmp(self, new_container: urwid.Widget):
"""Swap out the entire container stack for a new one with the
new container as the only item, while keeping spec and tag stacks
"""
tmp_inline_render_results = self.inline_render_results
tmp_container_stack = self.container_stack
tmp_in_new_block = self.in_new_block
self.inline_render_results = []
self.container_stack = []
self.container_push(new_container, is_new_block=True)
yield
# do not pop tokens when an exception occurrs! We want to have full
# context when handlingn errors!
self.container_pop()
self.inline_render_results = tmp_inline_render_results
self.container_stack = tmp_container_stack
self.in_new_block = tmp_in_new_block
[docs] def spec_push(self, new_spec, text_only=False):
"""Push a new AttrSpec onto the spec_stack"""
self.spec_stack.append((new_spec, text_only))
[docs] def spec_pop(self):
"""Push a new AttrSpec onto the spec_stack"""
if not self.spec_stack:
raise ValueError("Tried to pop off the spec stack one too many times")
return self.spec_stack.pop()
[docs] def spec_peek(self) -> urwid.AttrSpec:
"""Return the most recent spec, or None"""
if not self.spec_stack:
raise ValueError("Tried to pop off the spec stack one too many times")
return self.spec_stack[-1][0]
@property
def spec_general(self) -> Union[None, urwid.AttrSpec]:
"""Return the current fully resolved current AttrSpec"""
return utils.spec_from_stack(
self.spec_stack,
lambda s, text_only: not text_only,
)
@property
def spec_text(self) -> urwid.AttrSpec:
""" """
return utils.spec_from_stack(
self.spec_stack,
# include all of the specs!
lambda s, text_only: True,
)
[docs] def spec_text_with(self, other_spec: Union[None, urwid.AttrSpec]) -> urwid.AttrSpec:
if other_spec is None:
return self.spec_text
return utils.spec_from_stack(
[(self.spec_text, True), (other_spec, True)],
)
[docs] @contextlib.contextmanager
def use_spec(self, new_spec, text_only=False):
"""Ensure that specs are pushed/popped correctly"""
if new_spec is not None:
self.spec_push(new_spec, text_only)
yield
# do not pop tokens when an exception occurrs! We want to have full
# context when handlingn errors!
if new_spec is not None:
self.spec_pop()