Community
    • Login

    tool for interconverting/querying xml/csv/json/jsonl/regex matches

    Scheduled Pinned Locked Moved General Discussion
    conversionxmlcsvjsonregex
    7 Posts 2 Posters 2.8k Views
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • Mark OlsonM
      Mark Olson
      last edited by

      I’ve noticed enough general questions along the lines of “how to I convert between x format and y format” (e.g., CSV to XML) that I figured I’d just make one catch-all tool for doing most transformations you might want inside NPP without needing to know any programming languages (although it does help).

      This tool requires PythonScript 3.0.13 or newer.

      Simply run the script and follow the instructions. When prompted for a query, you can ignore it if you want to transform the file without filtering.

      It has not been thoroughly tested, so bug reports are appreciated.

      THE FILE IS PASTED IN TWO PARTS BETWEEN THIS POST AND THE NEXT POST (due to post length limit). YOU NEED TO INCLUDE BOTH PARTS IN THE SAME FILE FOR THIS TO WORK.

      PART 1

      '''
      Simple tool for querying CSVs, JSON, and tabular XML files with Python list comprehensions, etc.
      1. Use the PythonScript console.
      2. Call the pyq function on a string containing a query,
          using the variable "x" to represent the data file.
          You can also pass in an output type (defaults to 
          Numbers are parsed as floats; everything else is parsed as a string.
          For example, pyq("[row for row in x if row['CITY'].startswith('a')]").
          The query can include:
          * any functions that are always available, like max, min, and sorted
          * the mean function (takes average of a list of numbers)
          * functions/classes from math, re, and collections.
          * a group_by function (see below)
          * an inner_join function for performing a SQL-style inner join (see below)
      3. The result of the query will be displayed in a new window.
          For best results, move the query file to other view
          so that one view has your tabular file and the other view
          has the query results file.
      '''
      from Npp import *
      
      import csv
      import collections
      from datetime import datetime
      from io import StringIO
      import json
      import math
      import os
      import re
      from xml.etree import ElementTree as ET
      
      ### SETTINGS
      DIALOG_ON_RUN = True # open a pop-up dialog to run queries when the script is run (alternative: run from PythonScript console)
      VALID_DATA_TYPES = {'csv', 'json', 'jsonl', 'xml', 'regex'}
      FILE_EXTENSION_TYPE_MAPS = { # keys are file extensions, values are in VALID_DATA_TYPES
          'json': 'json',
          'xml': 'xml',
          'jsonl': 'jsonl',
          'csv': 'csv',
          'tsv': 'csv',
      }
      TYPES_TO_TEST = ['json', 'jsonl', 'xml'] # try these data types if file extension not helpful
      DEFAULT_OUTPUT_TYPE = 'csv' # must be in VALID_DATA_TYPES
      CSV_DEFAULT_OUTPUT_DIALECT = 'excel-tab' # tab-separated; use 'excel' for comma-separated
      AUTODETECT_BOOLEANS = True
      BOOLEAN_PAIRS = [ # case insensitive
          {'F': False, 'T': True},
          {'FALSE': False, 'TRUE': True},
          {'NO': False, 'YES': True},
          {'N': False, 'Y': True},
      ]
      BOOLEAN_VALS = {k: pair for pair in BOOLEAN_PAIRS for k in pair}
      DEFAULT_QUERY = 'x' # select all data
      DEFAULT_REGEX = '(?-s)^(.*)$' # select entire line
      ### END SETTINGS
      
      def now():
          return datetime.now().timestamp()
      
      def getFileExtension(fname):
          for ii in range(len(fname) - 1, -1, -1):
              if fname[ii] == '.':
                  break
          if ii == 0:
              return ''
          return fname[ii + 1:]
      
      PYQ_DIR = os.path.join(os.path.dirname(__file__), 'pyquery')
      
      num_regex = re.compile(r'[+-]?(?:(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][+-]?\d+)?|nan|inf)')
      
      #### QUERY FUNCTIONS ####
      
      def mean(rows):
          return sum(rows)/len(rows)
      
      
      def inner_join(rows1, rows2, var1, var2=None):
          '''
          Standard SQL-style inner join. var2 is set to var1 if not specified.
          If rows1 and rows2 are lists of lists,
              returns a list of dicts
                  where an 0-based index in a row is mapped to 'col{index}' for rows1
                                                            or 'col{index}_1' for rows2
          Otherwise, if rows2 has column names that are duplicates of rows in rows1,
              the duplicate names in rows2 are mangled by appending '_1'.
          EXAMPLE:
          >>> rows1 = [{'a': 1, 'b': 'foo', 'd': 5.2}, {'a': 3, 'b': 'bar', 'd': -1}]
          >>> rows2 = [{'a': 3, 'b': True, 'c': -2}, {'a': 1, 'b': False, 'c': -1}]
          >>> inner_join(rows1, rows2, 'a', 'b') # 1 and True are considered equal
      [{'a': 1, 'b': 'foo', 'd': 5.2, 'a_1': 3, 'b_1': True, 'c': -2}]
          >>> rows3 = [[1, 'a'], [2, 'b']]
          >>> rows4 = [[3, False, 'foo'], [2, True, 'bar'], [2, False, 'baz']]
          >>> inner_join(rows3, rows4, 0)
      [{'col0': 2, 'col1': 'b', 'col0_1': 2, 'col1_1': True, 'col2_1': 'bar'},
       {'col0': 2, 'col1': 'b', 'col0_1': 2, 'col1_1': False, 'col2_1': 'baz'}]
          >>> rows5 = [{'a': 1, 'b_1': 2, 'b': 1}]
          >>> rows6 = [{'a': 3, 'b': 1, 'c': -2, 'b_1': False}]
          >>> inner_join(rows5, rows6, 'a', 'b') # if k + '_1' is still duplicate, append '_1' until not duplicate
      [{'a': 1, 'b_1': 2, 'b': 1, 'a_1': 3, 'b_1_1': 1, 'c': -2, 'b_1_1_1': False}]
          '''
          if not (rows1 and rows2):
              return []
          if var2 is None:
              var2 = var1
          rows1_grouped = {}
          for row in rows1:
              val = row[var1]
              rows1_grouped.setdefault(val, []).append(row)
          rowtype = type(rows1[0])
          if rowtype == list:
              def row_combiner(row1, row2):
                  out = {f'col{ii}': v for ii, v in enumerate(row1)}
                  for ii, v in enumerate(row2):
                      out[f'col{ii}_1'] = v
                  return out
          else:
              shared_keys = set(rows1[0]) & set(k for k in rows2[0])
              def row_combiner(row1, row2):
                  out = {**row1}
                  for k, v in row2.items():
                      if k in shared_keys:
                          mangled = k + '_1'
                          ctr = 2
                          while mangled in out:
                              mangled += '_1'
                          out[mangled] = v
                      else:
                          out[k] = v
                  return out
          final = []
          for row2 in rows2:
              val = row2[var2]
              rows1_related = rows1_grouped.get(val)
              if rows1_related:
                  for row1 in rows1_related:
                      final.append(row_combiner(row1, row2))
          return final
      
      
      def group_by(rows, group_var, func, agg_var = None):
          '''
          rows: a list of dicts with str keys
          group_var: str, the column name to group by
          func: a function that operates on a list
              (must operate on list of dicts if agg_var is None)
              OR a list of such functions
          agg_var: a column name OR a list of column names.
      EXAMPLE:
      >>> group_by(rows, 'contaminated', max, 'zone')
      {
          "TRUE": 2.0,
          "FALSE": 4.0
      }
      >>> group_by(rows, 'contaminated', [mean, min], 'nums')
      {
          "TRUE": {
              "mean": 1.5,
              "min": 1.0
          },
          "FALSE": {
              "mean": 2.0,
              "min": 1.0
          }
      }
      >>> group_by(rows, 'names', [sum, min], ['nums', 'zone'])
      {
          "Bluds": {
              "nums": {"min": NaN, "sum": NaN},
              "zone": {"min": 1.0, "sum": 3.0}
          },
          "dfsd": {
              "nums": {"min": 0.5, "sum": 2.0},
              "zone": {"min": 2.0, "sum": 8.0}
          },
          "flodt": {
              "nums": {"min": 3.4, "sum": 3.4},
              "zone": {"min": 4.0, "sum": 4.0}
          }
      }
          '''
          row_groups = {}
          for row in rows:
              val = row[group_var]
              row_groups.setdefault(str(val), []).append(row)
          if isinstance(func, list):
              new_func = lambda x: {subfunc.__name__ : subfunc(x)
                                  for subfunc in func}
          else:
              new_func = func
          if isinstance(agg_var, list):
              return {group_val:
                          {sub_var: new_func([row[sub_var] for row in rows])
                                      for sub_var in agg_var}
                        for group_val, rows in row_groups.items()}
          elif isinstance(agg_var, str):
              return {group_val: new_func([row[agg_var] for row in rows])
                        for group_val, rows in row_groups.items()}
          return {group_val: new_func(rows) for group_val, rows in row_groups.items()}
      
      
      #### TABULARIZING FUNCTIONS ####
      
      
      def is_list_like(x):
          return isinstance(x, (list, set, frozenset, tuple))
      
      
      def convert_to_list_of_dicts(rows):
          if is_list_like(rows):
              if not isinstance(rows, list):
                  rows = list(rows)
              if not rows:
                  return rows
              firstrow = rows[0]
              if isinstance(firstrow, dict):
                  return rows
              elif not is_list_like(firstrow):
                  return [{'col1': row} for row in rows]
              return [dict(flatten_row(row, [])) for row in rows]
          elif isinstance(rows, dict):
              if not rows:
                  return rows
              lists, dicts, other_vars = [], [], []
              for k, v in rows.items():
                  if is_list_like(v):
                      lists.append((k, v))
                  elif isinstance(v, dict):
                      dicts.append((k, v))
                  else:
                      other_vars.append((k, v))
              if not lists:
                  return [dict(flatten_row(rows, []))]
              out_rows = []
              max_len = max(len(v) for k, v in lists)
              base_row = dict(other_vars)
              base_row.update(flatten_row(dict(dicts), []))
              for ii in range(max_len):
                  new_row = base_row.copy()
                  for list_var, list_ in lists:
                      new_row[list_var] = None if ii >= len(list_) else list_[ii]
                  out_rows.append(dict(flatten_row(new_row, [])))
              return out_rows
          # rows is a scalar
          return [{'col1': rows}]
      
      
      def flatten_row(row, current_key):
          if is_list_like(row):
              if not isinstance(row, list):
                  row = list(row)
              for ii, v in enumerate(row):
                  current_key.append(f'col{ii}')
                  yield from flatten_row(v, current_key)
                  current_key.pop()
          elif isinstance(row, dict):
              for k, v in row.items():
                  current_key.append(k)
                  yield from flatten_row(v, current_key)
                  current_key.pop()
          elif not current_key:
              yield '_', row
          else:
              yield '.'.join(current_key), row
      
      #### PROCESSING FUNCTIONS ####
      
      def csv_to_json(text):
          lines = [x.rstrip() for x in editor.getText().splitlines()]
          sniffer = csv.Sniffer()
          dialect = sniffer.sniff(text[:4092])
          reader = csv.DictReader(lines, dialect=dialect)
          rows = list(reader)
          postprocess_json(rows, AUTODETECT_BOOLEANS)
          return rows
      
      
      def xml_to_json(text):
          root = ET.fromstring(text)
          js = [{e.tag: e.text for e in child} for child in root]
          postprocess_json(js, AUTODETECT_BOOLEANS)
          return js
      
      
      def jsonl_to_json(text):
          lines = text.splitlines()
          return [json.loads(line) for line in lines]
      
      
      def postprocess_json(rows, autodetect_booleans):
          '''
          rows must be a list of dicts mapping str to str
          Performs the following in-place changes:
          1. Converts string representations of numbers (using '.' as decimal sep)
              to the corresponding numbers
          2. If autodetect_booleans, converts columns where all values are in the same
              boolean pair to the corresponding boolean value
              (e.g., columns of all 'T' and 'F' become True and False)
          '''
          bad_keys = set()
          boolean_pairs = {}
          if autodetect_booleans:
              for row in rows:
                  for k, v in row.items():
                      if k not in bad_keys and isinstance(v, str):
                          # check if value is in the same boolean pair
                          # as all previous values in this row
                          current_boopair = boolean_pairs.get(k)
                          upv = v.upper()
                          boopair = BOOLEAN_VALS.get(upv)
                          if not boopair or (current_boopair and boopair is not current_boopair):
                              bad_keys.add(k)
                              if k in boolean_pairs:
                                  del boolean_pairs[k]
                          else:
                              boolean_pairs[k] = boopair
          for row in rows:
              for k, v in row.items():
                  boopair = boolean_pairs.get(k)
                  if boopair:
                      row[k] = boopair[v.upper()]
                  elif isinstance(v, str) and num_regex.fullmatch(v):
                      row[k] = float(v)
      
      
      #### OUTPUT FUNCTIONS ####
      
      def dump_json(js_):
          return json.dumps(js_, indent=4)
          
      def dump_jsonl(js_):
          rows = convert_to_list_of_dicts(js_)
          return '\n'.join(json.dumps(row) for row in rows)
          
      def dump_csv(js_):
          rows = convert_to_list_of_dicts(js_)
          fieldnames = sorted(set(k for row in rows for k in row),
              key=lambda x: x.upper())
          strio = StringIO()
          writer = csv.DictWriter(strio, fieldnames, dialect=CSV_DEFAULT_OUTPUT_DIALECT)
          writer.writeheader()
          for row in rows:
              writer.writerow(row)
          return strio.getvalue()
          
      def dump_xml(js_):
          root = ET.Element('Table')
          rows = convert_to_list_of_dicts(js_)
          for row in rows:
              rowelement = ET.Element('Row')
              root.append(rowelement)
              for k, v in row.items():
                  velement = ET.Element(k)
                  velement.text = '' if v is None else str(v)
                  rowelement.append(velement)
          tree = ET.ElementTree(root)
          ET.indent(tree, ' ' * 4)
          strio = StringIO()
          tree.write(strio, encoding='unicode', xml_declaration=True, short_empty_elements=False)
          return strio.getvalue()
      
      def dump_to_regex_replace(js_, regex, bels):
          rows = convert_to_list_of_dicts(js_)
          # we want to use Boost regex syntax to parse our file
          notepad.new()
          newtext = '\n'.join(bels.join(str(v) for v in row.values()) for row in rows)
          editor.setText(newtext)
          search_regex = bels.join(f"(?'{k}'.*)" for k in rows[0])
          print(f'{search_regex = }, {regex = }')
          editor.rereplace(search_regex, regex)
          text = editor.getText()
          editor.setText('')
          notepad.close()
          return text
          
      
      Lycan ThropeL 1 Reply Last reply Reply Quote 3
      • Mark OlsonM
        Mark Olson
        last edited by Mark Olson

        PART 2 (ADD TO SCRIPT AFTER PART 1 ABOVE)

        
        #### HELPER CLASSES ####
        
        class TabDef:
            def __init__(self, rows, data_type):
                self.data_type = data_type
                self.refresh(rows)
        
            def query(self, query):
                return eval(query, {
                    'collections': collections,
                    'group_by': group_by,
                    'math': math,
                    'avg': mean,
                    'mean': mean,
                    'groupby': group_by,
                    'group_by': group_by,
                    'join': inner_join,
                    'inner_join': inner_join,
                    're': re,
                    'x': self.rows,
                })
        
            def refresh(self, rows):
                self.rows = rows
                self.last_refresh = now()
                self.get_column_types()
                
            def get_column_types(self):
                if (not self.rows) or (isinstance(self.data_type, str) and self.data_type == 'json'):
                    # json data need not have columns, can be tree-like
                    self.column_types = None
                    return
                self.column_types = {}
                for row in self.rows:
                    if is_list_like(row):
                        for ii, v in enumerate(row):
                            self.column_types.setdefault(ii, set()).add(type(v))
                    elif isinstance(row, dict):
                        for k, v in row.items():
                            self.column_types.setdefault(k, set()).add(type(v))
                            
            def column_types_repr(self):
                out = ['{']
                for k, types in self.column_types.items():
                    tlist = sorted('None' if t == type(None) else t.__name__
                        for t in types)
                    out.append(f"{repr(k)}: {'|'.join(tlist)}")
                    out.append(', ')
                out[-1] = '}'
                return ''.join(out)
        
        
        class PyQuery:
            def __init__(self):
                self.tabdefs = {}
                self.mtimes = {}
                self.id_to_be_renamed = None
                self.fname_to_be_renamed = None
                self.remembered_queries = {}
                self.remembered_regexes = {}
                self.remembered_output_formats = {}
                self.delimiting_bels = {} # count BEL chars for rereplace templating
                self.current_fname = notepad.getCurrentFilename()
                self.query_file_path = os.path.join(PYQ_DIR, 'PyQuery query file.pyq')
                self.remembered_queries_fname = 'queries.json'
                self.remembered_regexes_fname = 'regexes.json'
                self.remembered_output_formats_fname = 'output_formats.json'
                
            def dump_if_not_empty(self, dict_, fname):
                if not os.path.exists(PYQ_DIR) or not dict_:
                    return
                abspath = os.path.join(PYQ_DIR, fname)
                with open(abspath, 'w') as f:
                    json.dump(dict_, f)
                
            def on_shutdown(self, notif):
                self.dump_if_not_empty(self.remembered_queries, self.remembered_queries_fname)
                self.dump_if_not_empty(self.remembered_regexes, self.remembered_regexes_fname)
                self.dump_if_not_empty(self.remembered_output_formats, self.remembered_output_formats_fname)
                
            def get_remembered_thing(self, attr, fname, default):
                '''attr is the name of a dict attribute
                ('remembered_queries' or 'remembered_regexes' or 'remembered_output_formats')
                Returns the thing associated with fname.
                If it can't find the thing associated with fname, find the thing associated
                with that thing's file extension.
                '''
                if not getattr(self, attr):            
                    abspath = os.path.join(PYQ_DIR, fname)
                    if os.path.exists(abspath):
                        with open(abspath) as f:
                            setattr(self, attr, json.load(f))
                memo = getattr(self, attr)
                remembered_for_file = memo.get(self.current_fname)
                if remembered_for_file:
                    self.memorize_thing(remembered_for_file, memo, default)
                    return remembered_for_file
                exts = memo.get('extensions')
                if exts:
                    fname_ext = getFileExtension(self.current_fname)
                    return exts.get(fname_ext, default)
                return default
                
           
            def get_query(self):
                return self.get_remembered_thing('remembered_queries', self.remembered_queries_fname, DEFAULT_QUERY)
                
            def get_regex(self):
                return self.get_remembered_thing('remembered_regexes', self.remembered_regexes_fname, DEFAULT_REGEX)
                
            def get_output_format(self):
                return self.get_remembered_thing('remembered_output_formats',  self.remembered_output_formats_fname, DEFAULT_OUTPUT_TYPE)
        
            def memorize_thing(self, thing, dict_, default):
                if thing == default:
                    return
                dict_[self.current_fname] = thing
                ext = getFileExtension(self.current_fname)
                exts = dict_.setdefault('extensions', {})
                exts[ext] = thing
                
            def memorize_query(self, query):
                self.memorize_thing(query, self.remembered_queries, DEFAULT_QUERY)
                
            def memorize_output_format(self, output_format):
                self.memorize_thing(output_format, self.remembered_output_formats, DEFAULT_OUTPUT_TYPE)
                
            def memorize_regex(self, regex):
                self.memorize_thing(regex, self.remembered_regexes, DEFAULT_REGEX)
                
            def on_bufferactivated(self, notif):
                fname = notepad.getCurrentFilename()
                if fname == self.query_file_path:
                    return
                self.current_fname = notepad.getCurrentFilename()
                last_mod_outside_npp = now()
                try: # check for modifications outside of Notepad++
                    last_mod_outside_npp = os.path.getmtime(self.current_fname)
                except FileNotFoundError:
                    return # an in-memory buffer, can't be modified outside of NPP
                last_mod_in_buffer = self.mtimes.get(self.current_fname, 0.)
                self.mtimes[self.current_fname] = max(last_mod_in_buffer, last_mod_outside_npp)
                    
            def on_filebeforerename(self, notif):
                self.id_to_be_renamed = notif['bufferID']
                self.fname_to_be_renamed = notepad.getBufferFilename(self.id_to_be_renamed)
            
            def on_filerenamecancel(self, notif):
                self.id_to_be_renamed = None
                self.fname_to_be_renamed = None
            
            def on_filerenamed(self, notif):
                if not self.id_to_be_renamed: # was cancelled
                    return
                fname = notepad.getBufferFilename(self.id_to_be_renamed)
                print(f'{self.current_fname = }, {fname = }, {self.fname_to_be_renamed = }, {self.id_to_be_renamed = }')
                if self.fname_to_be_renamed == self.current_fname:
                    self.current_fname = fname
                mtime = self.mtimes.get(self.fname_to_be_renamed)
                if mtime:
                    self.mtimes[fname] = self.mtimes[self.fname_to_be_renamed]
                    del self.mtimes[self.fname_to_be_renamed]
                else:
                    self.mtimes[fname] = now()
                tabdef = self.tabdefs.get(self.fname_to_be_renamed)
                if tabdef:
                    self.tabdefs[fname] = tabdef
                    del self.tabdefs[self.fname_to_be_renamed]
                self.id_to_be_renamed = None
                self.fname_to_be_renamed = None
        
            def on_modified(self, notif):
                if not notif['text']:
                    return
                self.mtimes[self.current_fname] = now()
        
            def get_tabdef(self):
                tabdef = self.tabdefs.get(self.current_fname)
                if tabdef:
                    mtime = self.mtimes[self.current_fname]
                    if mtime < tabdef.last_refresh:
                        return tabdef
                else:
                    tabdef = TabDef(None, None)
                notepad.open(self.current_fname)
                extension = getFileExtension(self.current_fname)
                if tabdef.data_type:
                    data_type = tabdef.data_type
                else:
                    data_type = FILE_EXTENSION_TYPE_MAPS.get(extension)
                text = editor.getText()
                if not data_type:
                    for typ in ['xml', 'json', 'jsonl']: # don't try csv, it finds illogical column separators
                        proc = self.get_processor(typ)
                        try:
                            rows = proc(text)
                            return self.bind_tabdef(tabdef, typ, rows)
                        except Exception as ex: # not the correct way to parse
                            print(f'While parsing as {typ} in try loop, got error:\n{ex}')
                            pass
                    return None # couldn't parse
                else:
                    processor = self.get_processor(data_type)
                    try:
                        rows = processor(text)
                    except Exception as ex:
                        print(f'While parsing as {data_type}, got error:\n{ex}')
                        return None
                return self.bind_tabdef(tabdef, data_type, rows)
                
            def bind_tabdef(self, tabdef, data_type, rows):
                tabdef.data_type = data_type
                self.mtimes[self.current_fname] = now()
                self.tabdefs[self.current_fname] = tabdef
                tabdef.refresh(rows)
                return tabdef
                
            def get_delimiting_bels(self):
                '''
                we use BEL chars to delimit fields in the intermediate file
                when the output mode is a regex-replace
                EXAMPLE:
                If the query result is [{'col1': 0, 'col2': 'a'}, {'col1': 1, 'col2': 'b'}]
                the intermediate file contains
                0{BEL}a
                0{BEL}b
                where {BEL} is the BEL character.
                But if BEL is in the file's text, we need to use more bels to delimit.
                E.g, if the query result is [{'col1': 0, 'col2': 'a{BEL}'}, {'col1': 1, 'col2': 'b'}]
                the intermediate file will contain
                0{BEL}{BEL}a{BEL}
                1{BEL}{BEL}b{BEL}
                '''
                delimiting_bels = self.delimiting_bels.get(self.current_fname)
                if delimiting_bels:
                    return delimiting_bels
                self._consecutive_bel_count = 0
                def on_bel_match(m):
                    bel_ct = m.end() - m.start()
                    print(f'{self._consecutive_bel_count = }, {bel_ct = }')
                    self._consecutive_bel_count = max(self._consecutive_bel_count, bel_ct)
                print(f'After finds, {self._consecutive_bel_count = }')
                editor.research('\x07+', on_bel_match)
                delimiting_bels = '\x07' * (self._consecutive_bel_count + 1)
                self.delimiting_bels[self.current_fname] = delimiting_bels
                return delimiting_bels
        
            def write_to_pyq_buffer(self, text):
                if not os.path.exists(PYQ_DIR):
                    os.mkdir(PYQ_DIR)
                if not os.path.exists(self.query_file_path):
                    with open(self.query_file_path, 'w') as f:
                        f.write('')
                notepad.open(self.query_file_path)
                editor.setText(text)
            
            def regex_matches_to_json(self, regex):
                notepad.open(self.current_fname)
                matches = []
                editor.research(regex, lambda m: matches.append(m.groups()))
                self.memorize_regex(regex)
                rows = convert_to_list_of_dicts(matches)
                postprocess_json(rows, AUTODETECT_BOOLEANS)
                return rows
            
            def get_processor(self, data_type):
                return {
                    'csv': csv_to_json,
                    'json': json.loads,
                    'jsonl': jsonl_to_json,
                    'xml': xml_to_json,
                }.get(data_type,
                    lambda text: self.regex_matches_to_json(data_type))
                
            def dump(self, output_type, data=None):
                if data is None:
                    tabdef = self.get_tabdef()
                    data = tabdef.rows
                dumper = {
                    'json': dump_json,
                    'jsonl': dump_jsonl,
                    'csv':  dump_csv,
                    'xml':  dump_xml,
                }.get(output_type,
                    lambda rows: dump_to_regex_replace(rows, output_type, self.get_delimiting_bels()))
                outstr = dumper(data)
                self.memorize_output_format(output_type)
                self.write_to_pyq_buffer(outstr)
                
            def dialog(self):
                tabdef = self.get_tabdef()
                dtype = None if not tabdef else tabdef.data_type
                if not tabdef or dtype not in VALID_DATA_TYPES:
                    default_regex = self.get_regex()
                    regex = notepad.prompt(
                        f"Couldn't parse current file as any of the types {TYPES_TO_TEST}\nEnter csv to parse as csv, or a regex if you wish to query regex matches",
                        "Couldn't parse file; enter csv or a regex",
                        default_regex
                    )
                    if not regex:
                        return
                    rows = self.regex_matches_to_json(regex)
                    tabdef = self.bind_tabdef(TabDef(None, None), regex, rows)
                msg = f'Enter Python query on {tabdef.data_type} file. x represents data.'
                coltypes = '' if not (tabdef and tabdef.column_types) \
                    else f'Col types: {tabdef.column_types_repr()}'
                default_query = self.get_query()
                query = notepad.prompt(coltypes, msg, default_query)
                if not query:
                    return
                output_type = self.get_output_format()
                data_type = notepad.prompt('Enter an output format (regex or one of json, jsonl, csv, xml)', 'Enter an output format', output_type)
                if not data_type:
                    return
                self.pyq(query, data_type)
                
            def pyq(self, query, output_type=None):
                tabdef = self.get_tabdef()
                if not tabdef:
                    self.write_to_pyq_buffer(f'{self.current_fname} could not be parsed as a file of any of the types {VALID_DATA_TYPES}')
                    return
                if not output_type:
                    output_type = DEFAULT_OUTPUT_TYPE
                try:
                    result = tabdef.query(query)
                except Exception as ex:
                    self.write_to_pyq_buffer(f'Failure while parsing query:\n{ex}')
                    return
                self.memorize_query(query)
                self.dump(output_type, result)
        
        
        if __name__ == '__main__':
            try:
                INITIALIZED
            except NameError:
                INITIALIZED = True
                PYQ = PyQuery()
                pyq = PYQ.pyq
                dump = PYQ.dump
                notepad.callback(PYQ.on_bufferactivated, [NOTIFICATION.BUFFERACTIVATED])
                notepad.callback(PYQ.on_filebeforerename, [NOTIFICATION.FILEBEFORERENAME])
                notepad.callback(PYQ.on_filerenamecancel, [NOTIFICATION.FILERENAMECANCEL])
                notepad.callback(PYQ.on_filerenamed, [NOTIFICATION.FILERENAMED])
                notepad.callback(PYQ.on_shutdown, [NOTIFICATION.BEFORESHUTDOWN])
                editor.callback(PYQ.on_modified, [SCINTILLANOTIFICATION.MODIFIED])
            if DIALOG_ON_RUN:
                PYQ.dialog()
        
        1 Reply Last reply Reply Quote 3
        • Mark OlsonM
          Mark Olson
          last edited by

          I should probably mention some quality-of-life features that might make you want to consider using this tool:

          1. Numbers are automatically parsed as floating-point numbers, no need to do any annoying convert-to-number-if-passes-regex-test stuff
          2. Automatically detects when file is edited, only re-parses file if required
          3. Automatic detection of booleans
          4. Regular expressions are supported in the query (Python re functions)
          5. Conversion speed seems to be competitive with any other plugin I’ve seen

          I’ll also just illustrate a regex-related use case that would normally require a bespoke script.
          Start with

          foo: tone bar: 27.1
          foo: bLue bar: 241.3
          foo: taco bar: 66.5
          foo: basst bar: 281.7
          foo: öyster bar: 144.0
          foo: oyster bar: 164.1
          foo: Spb bar: -102.1
          foo: blüe bar: 203.2
          foo: spä bar: 1.2E-3
          foo: spb bar: 84.2e4
          foo: blve bar: 183.2
          foo: blue bar: -222.2
          foo: baßk bar: 261.3
          foo: täco bar: 47.3e5
          foo: täco bar: 300.3e5
          

          In first prompt, enter (?-s)^foo: (\w+) bar: (.+)$. This captures the word after foo as col0 and the number after bar: as col1.

          Now we want to sort these lines by the number after bar (and secondarily by the word after foo) and eliminate any lines where the number after bar is less than 0.
          So in the second prompt enter sorted((r for r in x if r['col1'] > 0), key=lambda x: [x['col1'], x['col0'].upper()])

          And let’s get it back in the original format. In the final prompt, enter foo: $1 bar: $2
          We get the filtered, sorted rows:

          foo: spä bar: 0.0012
          foo: tone bar: 27.1
          foo: taco bar: 66.5
          foo: öyster bar: 144.0
          foo: oyster bar: 164.1
          foo: blve bar: 183.2
          foo: blüe bar: 203.2
          foo: bLue bar: 241.3
          foo: baßk bar: 261.3
          foo: basst bar: 281.7
          foo: spb bar: 842000.0
          foo: täco bar: 4730000.0
          foo: täco bar: 30030000.0
          
          1 Reply Last reply Reply Quote 1
          • Lycan ThropeL
            Lycan Thrope @Mark Olson
            last edited by

            @Mark-Olson ,
            For consistentcy, this file should be named, what? :-) Always good to have that name so someone knows what someone is talking about when trying to refer to the script. :-)

            Mark OlsonM 1 Reply Last reply Reply Quote 1
            • Mark OlsonM
              Mark Olson @Lycan Thrope
              last edited by

              @Lycan-Thrope
              pyquery.py

              Lycan ThropeL 1 Reply Last reply Reply Quote 2
              • Lycan ThropeL
                Lycan Thrope @Mark Olson
                last edited by

                @Mark-Olson ,
                Thanks. I saw the reference to the filename at the end of the file, but wanted to be sure, for future reference. :-)

                1 Reply Last reply Reply Quote 0
                • Mark OlsonM
                  Mark Olson
                  last edited by Mark Olson

                  Permanent link to most up-to-date version

                  Things fixed since my original post above:

                  1. Caching problems (my original assertion that files were re-parsed only as needed was actually false)
                  2. Originally if you used $0 in your output format to get all the matched text (not just capture groups), the output file would contain the BEL characters I used to delimit the capture groups in the intermediate file.
                  3. Allow group_by and join to group by/join by multiple columns. See docs in file.
                  4. Added byind and bykey convenience functions for easier sorting by index/key in child.
                  1 Reply Last reply Reply Quote 1
                  • First post
                    Last post
                  The Community of users of the Notepad++ text editor.
                  Powered by NodeBB | Contributors