diff --git a/vscode_projects/__init__.py b/vscode_projects/__init__.py index 7ed263c..4a73c4f 100644 --- a/vscode_projects/__init__.py +++ b/vscode_projects/__init__.py @@ -1,14 +1,17 @@ # -*- coding: utf-8 -*- # Copyright (c) 2024 Sharsie +from albert import * +from dataclasses import dataclass import os import json from pathlib import Path -from dataclasses import dataclass -from albert import * +import sqlite3 +import threading +from urllib.parse import urlparse, unquote md_iid = "3.0" -md_version = "1.10" +md_version = "1.11" md_name = "VSCode projects" md_description = "Open VSCode projects" md_url = "https://github.com/albertlauncher/python/tree/master/vscode_projects" @@ -16,11 +19,13 @@ md_bin_dependencies = ["code"] md_authors = ["@Sharsie"] + @dataclass class Project: displayName: str name: str path: str + isDirectory: bool tags: list[str] @@ -40,20 +45,19 @@ class CachedConfig: class Plugin(PluginInstance, TriggerQueryHandler): - # Possible locations for Code configuration - _configStoragePaths = [ - os.path.join(os.environ["HOME"], ".config/Code/storage.json"), - os.path.join(os.environ["HOME"], - ".config/Code/User/globalStorage/storage.json"), - ] - - # Possible locations for Project Manager extension configuration - _configProjectManagerPaths = [ - os.path.join( - os.environ["HOME"], ".config/Code/User/globalStorage/alefragnani.project-manager/projects.json") - ] - - # Indicates whether results from the Recent list in VSCode should be searched + # Location of the database with recent entries + _stateDBPath = os.path.join( + os.environ["HOME"], + ".config/Code/User/globalStorage/state.vscdb" + ) + + # Location of the project manager configuration file + _projectsPath = os.path.join( + os.environ["HOME"], + ".config/Code/User/globalStorage/alefragnani.project-manager/projects.json" + ) + + # Indicates whether results from the Recent list should be searched _recentEnabled = True # Indicates whether projects from Project Manager extension should be searched @@ -67,13 +71,16 @@ class Plugin(PluginInstance, TriggerQueryHandler): "Recent": 15 } - # Holds cached data from the json configurations + # Holds cached data from the configurations _configCache: dict[str, CachedConfig] = {} + # Holds the last time the database was queried + _dbLastQueryDBModTime: float = 0 + # Overrides the command to open projects _terminalCommand = "" - # Setting indicating whether results from the Recent list in VSCode should be searched + # Setting indicating whether results from the Recent list should be searched @property def recentEnabled(self): return self._recentEnabled @@ -93,13 +100,7 @@ def projectManagerEnabled(self, value): self._projectManagerEnabled = value self.writeConfig("projectManagerEnabled", value) - found = False - for p in self._configProjectManagerPaths: - if os.path.exists(p): - found = True - break - - if found == False: + if not os.path.exists(self._projectsPath): warning( "Project Manager search was enabled, but configuration file was not found") notif = Notification( @@ -171,15 +172,8 @@ def __init__(self): TriggerQueryHandler.__init__(self) - configFound = False - - for p in self._configStoragePaths: - if os.path.exists(p): - configFound = True - break - - if not configFound: - warning("Could not find any VSCode configuration directory") + if not os.path.exists(self._stateDBPath): + warning("Could not find the state database") self._initConfiguration() @@ -187,7 +181,7 @@ def configWidget(self): return [ { "type": "label", - "text": """Recent files are sorted in order found in the VSCode configuration. + "text": """Recent files are sorted in order found in the state. Sort order with Project Manager can be adjusted, lower number = higher priority = displays first. With all priorities equal, PM results will take precedence over recents.""" }, @@ -247,7 +241,8 @@ def configWidget(self): { "type": "label", "text": """ -The way VSCode is opened can be overriden through terminal command. +The way VSCode is opened can be overridden through terminal command. +This only works for projects, or recent directories, not workspaces or recent files. Terminal will enter the working directory of the project upon selection, execute the command and then close itself. Usecase with direnv - To load direnv environment before opening VSCode, enter the following custom command: direnv exec . code . @@ -270,17 +265,10 @@ def _initConfiguration(self): else: self._recentEnabled = recentEnabled - # Project Manager search - foundPM = False - for p in self._configProjectManagerPaths: - if os.path.exists(p): - foundPM = True - break - projectManagerEnabled = self.readConfig('projectManagerEnabled', bool) if projectManagerEnabled is None: # If not configured, check if the project manager configuration file exists and if so, enable PM search - if foundPM: + if os.path.exists(self._projectsPath): self._projectManagerEnabled = True self.writeConfig("projectManagerEnabled", True) else: @@ -305,38 +293,41 @@ def handleTriggerQuery(self, query): if not query.isValid: return - if query.string == "": - return - - matcher = Matcher(query.string) - results: dict[str, SearchResult] = {} - if self.recentEnabled: - results = self._searchInRecentFiles(matcher, results) + if query.string != "": + matcher = Matcher(query.string) - if self.projectManagerEnabled: - results = self._searchInProjectManager(matcher, results) + if self.recentEnabled: + results = self._searchInRecentFiles(matcher, results) + + if self.projectManagerEnabled: + results = self._searchInProjectManager(matcher, results) + elif self.recentEnabled: + results = self._searchInRecentFiles(Matcher(""), results) sortedItems = sorted(results.values(), key=lambda item: "%s_%s_%s" % ( '{:03d}'.format(item.priority), '{:03d}'.format(item.sortIndex), item.project.name), reverse=False) items: list[StandardItem] = [] for i in sortedItems: - items.append(self._createItem(i.project, query)) + items.append(self._createItem(i.project)) query.add(items) # Creates an item for the query based on the project and plugin settings - def _createItem(self, project: Project, query: Query) -> StandardItem: + def _createItem(self, project: Project) -> StandardItem: actions: list[Action] = [] - if self.terminalCommand != "": + # Only add terminal command action if the project is a directory + # Handling single files or workspaces would likely over-complicate the code and options + if self.terminalCommand != "" and project.isDirectory: actions.append( Action( id="open-terminal", text=f"Run terminal command in project's workdir: {self.terminalCommand}", - callable=lambda: runTerminal(f"cd {project.path} && {self.terminalCommand}") + callable=lambda: runTerminal( + f"cd {project.path} && {self.terminalCommand}") ) ) @@ -366,185 +357,244 @@ def _createItem(self, project: Project, query: Query) -> StandardItem: def _searchInRecentFiles(self, matcher: Matcher, results: dict[str, SearchResult]) -> dict[str, SearchResult]: sortIndex = 1 - for path in self._configStoragePaths: - c = self._getStorageConfig(path) - for proj in c.projects: - # Resolve sym links to get unique results - resolvedPath = str(Path(proj.path).resolve()) - if matcher.match(proj.name) or matcher.match(proj.path) or matcher.match(resolvedPath): - results[resolvedPath] = self._getHigherPriorityResult( - SearchResult( - project=proj, - priority=self.priorityRecent, - sortIndex=sortIndex - ), - results.get(resolvedPath), - ) + c = self._getDBConfig() + for proj in c.projects: + # Resolve symlinks to get unique results + resolvedPath = str(Path(proj.path).resolve()) + + if matcher.match(proj.name) or matcher.match(proj.path) or matcher.match(resolvedPath): + results[resolvedPath] = self._getHigherPriorityResult( + SearchResult( + project=proj, + priority=self.priorityRecent, + sortIndex=sortIndex + ), + results.get(resolvedPath), + ) - if results.get(resolvedPath) is not None: - sortIndex += 1 + if results.get(resolvedPath) is not None: + sortIndex += 1 return results def _searchInProjectManager(self, matcher: Matcher, results: dict[str, SearchResult]) -> dict[str, SearchResult]: - for path in self._configProjectManagerPaths: - c = self._getProjectManagerConfig(path) - for proj in c.projects: - # Resolve sym links to get unique results - resolvedPath = str(Path(proj.path).resolve()) - if matcher.match(proj.name): - results[resolvedPath] = self._getHigherPriorityResult( - SearchResult( - project=proj, - priority=self.priorityPMName, - sortIndex=0 if matcher.match(proj.name).isExactMatch() else 1 - ), - results.get(resolvedPath), - ) + c = self._getProjectManagerConfig(self._projectsPath) + for proj in c.projects: + # Resolve symlinks to get unique results + resolvedPath = str(Path(proj.path).resolve()) + + if matcher.match(proj.name): + results[resolvedPath] = self._getHigherPriorityResult( + SearchResult( + project=proj, + priority=self.priorityPMName, + sortIndex=0 if matcher.match( + proj.name).isExactMatch() else 1 + ), + results.get(resolvedPath), + ) + if self.priorityPMName < self.priorityPMPath and self.priorityPMName < self.priorityPMTag: + # PM name takes highest precedence, continue with next project + continue - if matcher.match(proj.path) or matcher.match(resolvedPath): + if matcher.match(proj.path) or matcher.match(resolvedPath): + results[resolvedPath] = self._getHigherPriorityResult( + SearchResult( + project=proj, + priority=self.priorityPMPath, + sortIndex=1 + ), + results.get(resolvedPath), + ) + if self.priorityPMPath < self.priorityPMTag: + # PM path takes precedence over tags, continue with next project + continue + + for tag in proj.tags: + if matcher.match(tag): results[resolvedPath] = self._getHigherPriorityResult( SearchResult( project=proj, - priority=self.priorityPMPath, + priority=self.priorityPMTag, sortIndex=1 ), results.get(resolvedPath), ) - - for tag in proj.tags: - if matcher.match(tag): - results[resolvedPath] = self._getHigherPriorityResult( - SearchResult( - project=proj, - priority=self.priorityPMTag, - sortIndex=1 - ), - results.get(resolvedPath), - ) - break + break return results # Compares the search results to return the one with higher priority - # For nitpickers: higher priorty = lower number + # For nitpickers: higher priority = lower number def _getHigherPriorityResult(self, current: SearchResult, prev: SearchResult | None) -> SearchResult: if prev is None or current.priority < prev.priority or (current.priority == prev.priority and current.sortIndex < prev.sortIndex): return current return prev - def _getStorageConfig(self, path: str) -> CachedConfig: - c: CachedConfig = self._configCache.get(path, CachedConfig([], 0)) + def _getDBConfig(self) -> CachedConfig: + c: CachedConfig = self._configCache.get( + self._stateDBPath, CachedConfig([], 0)) - if not os.path.exists(path): + # Do not proceed if the database does not exist + # Storage location has been changing over time + if not os.path.exists(self._stateDBPath): return c - mTime = os.stat(path).st_mtime + # Get the modification time of the database + mTime = os.stat(self._stateDBPath).st_mtime + # If the modification time is the same as the cached one, return the cached config if mTime == c.mTime: return c - c.mTime = mTime - - with open(path) as configFile: - # Load the storage json - storageConfig = json.loads(configFile.read()) - - if ( - "lastKnownMenubarData" in storageConfig - and "menus" in storageConfig["lastKnownMenubarData"] - and "File" in storageConfig["lastKnownMenubarData"]["menus"] - and "items" in storageConfig["lastKnownMenubarData"]["menus"]["File"] - ): - # These are all the menu items in File dropdown - for menuItem in storageConfig["lastKnownMenubarData"]["menus"]["File"]["items"]: - # Cannot safely detect proper menu item, as menu item IDs change over time - # Instead we will search all submenus and check for IDs inside the submenu items - if ( - not "id" in menuItem - or not "submenu" in menuItem - or not "items" in menuItem["submenu"] - ): - continue - - for submenuItem in menuItem["submenu"]["items"]: - # Check of submenu item with id "openRecentFolder" and make sure it contains necessarry keys - if ( - not "id" in submenuItem - or submenuItem['id'] != "openRecentFolder" - or not "enabled" in submenuItem - or submenuItem["enabled"] != True - or not "label" in submenuItem - or not "uri" in submenuItem - or not "path" in submenuItem["uri"] - ): - continue - - # Get the full path to the project - recentPath = submenuItem["uri"]["path"] - if not os.path.exists(recentPath): - continue - - displayName = recentPath.split("/")[-1] - - # Inject the project - c.projects.append(Project( - displayName=displayName, - name=displayName, - path=recentPath, - tags=[], - )) - - self._configCache[path] = c + # While querying DB is in progress, return the cached config + if self._dbLastQueryDBModTime == mTime: + return c + self._dbLastQueryDBModTime = mTime + + # Process the database and update cached config returning it + def _process_storage(): + # Create a config with modification time of the database + newConf = CachedConfig([], 0) + newConf.mTime = mTime + try: + with sqlite3.connect(self._stateDBPath) as con: + # Load recent entries + for row in con.execute('SELECT * FROM "ItemTable" WHERE KEY = \'history.recentlyOpenedPathsList\''): + # Parse the recent entries into a json for further processing + data = json.loads(row[1]) + + for entry in data["entries"]: + isDirectory = False + isWorkspace = False + + # Get the full path to the recent entry + if "folderUri" in entry: + isDirectory = True + parsed_uri = urlparse( + entry["folderUri"] + ) + elif "workspace" in entry: + isWorkspace = True + parsed_uri = urlparse( + entry["workspace"]["configPath"] + ) + elif "fileUri" in entry: + parsed_uri = urlparse( + entry["fileUri"] + ) + else: + continue + + # Only support file URIs + if parsed_uri.scheme != "file": + continue + + # Get the full path to the project + recentPath = Path( + unquote(parsed_uri.path)).as_posix() + + # Make sure the path exists, so we skip removed directories + if not os.path.exists(recentPath): + continue + + # Get the name of the project from the path + displayName = os.path.basename(recentPath) + if isWorkspace: + # Remove .code-workspace from the display name + if recentPath.endswith(".code-workspace"): + displayName = displayName[:-15] + displayName += " (Workspace)" + + # Add the project to the config + newConf.projects.append(Project( + displayName=displayName, + isDirectory=isDirectory, + name=displayName, + path=recentPath, + tags=[], + )) + except Exception as e: + warning(f"Failed to read the state database: {e}") + + # Update the cache only if this is the latest pass + if self._dbLastQueryDBModTime == mTime: + self._configCache[self._stateDBPath] = newConf + + return newConf + + # If the config was not loaded yet, block execution + if c.mTime == 0: + return _process_storage() + + # Process the database in a separate thread to update it in the background + thread = threading.Thread(target=_process_storage) + thread.start() + + # Return the cached config return c def _getProjectManagerConfig(self, path: str) -> CachedConfig: c = self._configCache.get(path, CachedConfig([], 0)) + # Do not proceed if the configuration file does not exist (e.g. PM was uninstalled while albert was running) if not os.path.exists(path): return c + # Get the modification time of the configuration file mTime = os.stat(path).st_mtime + # If the modification time is the same as the cached one, return the cached config if mTime == c.mTime: return c + # Update the cached modification time c.mTime = mTime - with open(path) as configFile: - configuredProjects = json.loads(configFile.read()) - - for p in configuredProjects: - # Make sure we have necessarry keys - if ( - not "rootPath" in p - or not "name" in p - or not "enabled" in p - or p["enabled"] != True - ): - continue - - # Grab the path to the project - rootPath = p["rootPath"] - if os.path.exists(rootPath) == False: - continue + try: + # Parse the configuration file + with open(path) as configFile: + configuredProjects = json.loads(configFile.read()) - project = Project( - displayName=p["name"], - name=p["name"], - path=rootPath, - tags=[], - ) + for p in configuredProjects: + # Make sure we have necessary keys + if ( + not "rootPath" in p + or not "name" in p + or not "enabled" in p + or p["enabled"] != True + ): + continue - # Search against the query string - if "tags" in p: - for tag in p["tags"]: - project.tags.append(tag) + # Grab the path to the project + rootPath = p["rootPath"] + if os.path.exists(rootPath) == False: + continue - c.projects.append(project) + project = Project( + displayName=p["name"], + isDirectory=True, + name=p["name"], + path=rootPath, + tags=[], + ) - self._configCache[path] = c + # Search against the query string + if "tags" in p: + for tag in p["tags"]: + project.tags.append(tag) + + c.projects.append(project) + + # Update the cache + self._configCache[path] = c + except IOError: + warning(f"Failed to read the PM configuration file: {path}") + except (json.JSONDecodeError): + warning(f"Failed to parse the PM configuration file: {path}") + except Exception as e: + warning(f"PM configuration file error: {e}") return c