Skip to content

API Reference

CometUSB Core

Operating_System

This is the main object that contains all the information of the Operating System, target device and it's partition, bios-type etc. Once the create method is invoked it will start the process which includes wiping and formatting the target disk, downloading and extracting of files and applying bootloader.

Source code in cometusb/cometusb.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class Operating_System():
    """
    This is the main object that contains all the information of the Operating System, target device and it's partition, bios-type etc.
    Once the create method is invoked it will start the process which includes wiping and formatting the target disk, downloading and extracting of files and
    applying bootloader.
    """
    def __init__(self, name: str, bios_type: str):
        self.name = name
        self._path_url = f"https://github.com/CometUSB/CometUSB/releases/download/{self.name}/"
        self.disk_size_reqd = self.name
        self.bios_type = bios_type
        self.partition_style = self.bios_type
        self._target_disk = get_disk_details()
        self._partitions = format_disk(self._target_disk, self.bios_type, self.disk_size_reqd) # Dictionary of newly created partitions with labels.
        self.files = self.name
        self._architecture= "64 Bit"

    def __str__(self) -> str:
        return f"\nOS = {self.name.capitalize()}\nArchitecture = {self._architecture}\nTarget System BIOS Type = {self.bios_type.upper()}\nTarget Device = {self._target_disk}\nPartition Style = {self.partition_style.upper()}\nFiles to be Downloaded = {list(self.files)}\n"

    @property
    def name(self) -> str:
        return self._name

    @name.setter
    def name(self, name: str) -> None:
        """
        Sets name of the Operating System.

        :param name: Name of the Operating System.
        :type name: str
        """
        if name not in OS.keys():
            sys.exit("[!] Invalid or Unsupported Operating System.\nEnter 'cometusb.py --OS-list' without quotes to see the supported list of Operating systems")

        self._name: str = name

    @property
    def partition_style(self) -> str:
        return self._partition_style

    @partition_style.setter
    def partition_style(self, bios_type: str) -> None:
        """
        Sets the partition style i.e, MBR or GPT.

        :param bios_type: BIOS firmare type i.e, UEFI or Legacy.
        :type bios_type: str
        """
        if bios_type == "uefi":
            self._partition_style: str = "GPT"

        else:
            self._partition_style: str = "MBR"

    @property
    def bios_type(self) -> str:
        return self._bios_type

    @bios_type.setter
    def bios_type(self, bios_type: str) -> None:
        """
        Sets the BIOS type.

        :param bios_type: BIOS firmare type i.e, UEFI or Legacy.
        :type bios_type: str
        """
        if bios_type in ["uefi", "legacy"]:
            self._bios_type: str = bios_type

        else:
            sys.exit("[!] Invalid BIOS type.")

    @property
    def files(self) -> dict:
        return self._files

    @files.setter
    def files(self, name: str) -> None:
        """
        Selects files of given Operating System for Installation.

        :param name: Name of the Operating System.
        :type name: str
        """

        if len(self._partitions) == 2:
            self._boot_partition, self._files_partition = self._partitions.keys() # Labels of boot and files partitions.

        elif len(self._partitions) == 1:
            self._files_partition: str = list(self._partitions.keys())[0] 
            self._boot_partition: str = self._files_partition

        self._files: dict = {} # This will have file name with it's path.

        for file in OS[name]["files"]:
            if file == "boot.zip": 
                self._files[file] = f"/mnt/{self._boot_partition}/" 

            else:
                self._files[file] = f"/mnt/{self._files_partition}/"

    @property
    def disk_size_reqd(self):
        return self._disk_size_reqd

    @disk_size_reqd.setter
    def disk_size_reqd(self, name: str):
        """
        Calculates the size of disk required for process.

        :param name: Name of the Operating System.
        """

        total_size: int = 0
        for filename in OS[name]["files"]:
            with requests.get(self._path_url + filename, stream = True) as response:
                response.raise_for_status()
                total_size += int(response.headers.get("content-length", 0))

        total_size = total_size / (1024 * 1024 * 1024) # Total files size in GB.

        USB_SIZES = [4, 8, 16, 32, 64]
        for usb_size in USB_SIZES:
            if usb_size / (total_size * 2) > 1: # Divided by twice of the total_size because of merging of OS images will require space for merged file. Although splitted image files will be removed.
                self._disk_size_reqd = usb_size
                break

    def bootloader(self) -> None:
        """
        Applies bootloader to the partition of the target disk containing boot files.
        """
        print(f"Applying bootloader on {self._target_disk} for {self.bios_type.upper()} systems...")
        if self.bios_type == "uefi":
            subprocess.run(["sudo", "grub-install" ,"--target=x86_64-efi", f"--efi-directory=/mnt/{self._boot_partition}", f"--boot-directory=/mnt/{self._boot_partition}/boot", "--removable"])

        else: 
            subprocess.run(["sudo", "grub-install" ,"--target=i386-pc", f"--boot-directory=/mnt/{self._boot_partition}/boot", f"{self._target_disk}"])

    def create(self) -> None:
        """
        This method calls all the required functions in sequence to perform the necessary operations to make the bootable media.
        """
        # Mounting newly created partitions.
        mount_usb(self._partitions)

        # Disk configuration info. 
        print(f"\n[*] Disk {self._target_disk} configuration.")
        subprocess.run(["lsblk", self._target_disk, "-o", "NAME,SIZE,FSTYPE,FSVER,LABEL,MOUNTPOINTS"])

        for filename, download_dir in self.files.items():
            print() #To create gap between progress bars.
            print(f"Downloading {filename} into {download_dir}")
            downloader(self._path_url + filename, download_dir)
            print() #To create gap between progress bars.
            if filename.endswith(".zip"):
                # Extracting to create the directory tree structure
                extractor(download_dir + filename, download_dir)
                print() #To create gap between progress bars.
                os.remove(download_dir + filename) # Removing the zip file after extracting to free space in the USB.
            if filename.endswith(".aa"):
                image_name, download_dir = filename.rstrip(".aa"), download_dir

        # Reconstructing original OS image from splitted image files.
        print(f"[*] Making OS Image {image_name} ready for installation.\n[*] This may take a while depending upon your removable disk {self._target_disk}.")
        subprocess.run(f"sudo cat {download_dir}{image_name}.* > {download_dir}{OS[self.name]["image_folder"]}{image_name}", shell=True, stdout=subprocess.DEVNULL)
        print(f"[*] OS image is placed in {OS[self.name]["image_folder"]} successfully.")

        # Removing splitted image files.
        print("[*] Cleaning unnecessary files...")
        for splitted_image_file in glob.glob(f"{download_dir}{image_name}.*"):
            os.remove(splitted_image_file)

        # Applying bootloader
        self.bootloader()

        print("Media created succesfully\nNOTE: Linux disk sometimes is not detected in BIOS, try disabling secure boot of your BIOS if facing any issues while booting.")

bootloader()

Applies bootloader to the partition of the target disk containing boot files.

Source code in cometusb/cometusb.py
180
181
182
183
184
185
186
187
188
189
def bootloader(self) -> None:
    """
    Applies bootloader to the partition of the target disk containing boot files.
    """
    print(f"Applying bootloader on {self._target_disk} for {self.bios_type.upper()} systems...")
    if self.bios_type == "uefi":
        subprocess.run(["sudo", "grub-install" ,"--target=x86_64-efi", f"--efi-directory=/mnt/{self._boot_partition}", f"--boot-directory=/mnt/{self._boot_partition}/boot", "--removable"])

    else: 
        subprocess.run(["sudo", "grub-install" ,"--target=i386-pc", f"--boot-directory=/mnt/{self._boot_partition}/boot", f"{self._target_disk}"])

create()

This method calls all the required functions in sequence to perform the necessary operations to make the bootable media.

Source code in cometusb/cometusb.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def create(self) -> None:
    """
    This method calls all the required functions in sequence to perform the necessary operations to make the bootable media.
    """
    # Mounting newly created partitions.
    mount_usb(self._partitions)

    # Disk configuration info. 
    print(f"\n[*] Disk {self._target_disk} configuration.")
    subprocess.run(["lsblk", self._target_disk, "-o", "NAME,SIZE,FSTYPE,FSVER,LABEL,MOUNTPOINTS"])

    for filename, download_dir in self.files.items():
        print() #To create gap between progress bars.
        print(f"Downloading {filename} into {download_dir}")
        downloader(self._path_url + filename, download_dir)
        print() #To create gap between progress bars.
        if filename.endswith(".zip"):
            # Extracting to create the directory tree structure
            extractor(download_dir + filename, download_dir)
            print() #To create gap between progress bars.
            os.remove(download_dir + filename) # Removing the zip file after extracting to free space in the USB.
        if filename.endswith(".aa"):
            image_name, download_dir = filename.rstrip(".aa"), download_dir

    # Reconstructing original OS image from splitted image files.
    print(f"[*] Making OS Image {image_name} ready for installation.\n[*] This may take a while depending upon your removable disk {self._target_disk}.")
    subprocess.run(f"sudo cat {download_dir}{image_name}.* > {download_dir}{OS[self.name]["image_folder"]}{image_name}", shell=True, stdout=subprocess.DEVNULL)
    print(f"[*] OS image is placed in {OS[self.name]["image_folder"]} successfully.")

    # Removing splitted image files.
    print("[*] Cleaning unnecessary files...")
    for splitted_image_file in glob.glob(f"{download_dir}{image_name}.*"):
        os.remove(splitted_image_file)

    # Applying bootloader
    self.bootloader()

    print("Media created succesfully\nNOTE: Linux disk sometimes is not detected in BIOS, try disabling secure boot of your BIOS if facing any issues while booting.")

main()

Source code in cometusb/cometusb.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def main() -> None:
    parser = argparse.ArgumentParser(
                    prog= "CometUSB.",
                    description="Create linux bootable USB."
                    )
    parser.add_argument("-l", "--list-os", action="store_true", help="Shows list of the available Operating Systems.")
    parser.add_argument("-o", "--operating-system", help="Name of the Operating System.")
    parser.add_argument("-b","--bios-type", help="BIOS type (e.g., UEFI or Legacy), check what your TARGET SYSTEM supports.")
    args = parser.parse_args()

    global OS    
    OS = {
        "linuxmint": {
            "files": [
                "boot.zip",
                "directories.zip", 
                "filesystem.squashfs.aa",
                "filesystem.squashfs.ab"
            ],
            "image_folder": "casper/" # OS image is located in image_folder.
        }
        }

    # Shows list of available Operating Systems.
    if args.list_os:
        all_os = list(OS.keys())
        for number in range(len(all_os)):
            print(number + 1, all_os[number].capitalize(), sep=". ")
        sys.exit() # Exits after showing the OS list

    if not (args.operating_system and args.bios_type):
        sys.exit("Argument missing\nType cometusb -h to see the usage.")

    try:
        operating_system = Operating_System(args.operating_system.lower(), args.bios_type.lower())
        print(operating_system)
        operating_system.create()
    except PermissionError: # Handling PermissionError if not ran as sudo.
        sys.exit("\nAre you root?\nRun it with sudo. See documentation at https//:CometUSB.github.io/CometUSB/usage/ for usage.")

get_disk_details()

Executes the 'lsblk' command to retrieve detailed information for physical block disks using JSON output for robust, programmatic parsing.

:return: A string of target device. :rtype: str

Source code in cometusb/cometusb.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def get_disk_details() -> str:
    """
    Executes the 'lsblk' command to retrieve detailed information for physical
    block disks using JSON output for robust, programmatic parsing.

    :return: A string of target device.
    :rtype: str
    """

    LSBLK_CMD = ['lsblk', '-d','-J', '-o', 'NAME,SIZE,VENDOR,MODEL,RM']


    # Execute the command, capture output, and ensure success
    result = subprocess.run(
        LSBLK_CMD, 
        capture_output=True, 
        text=True,
        check=True
    )

    # Parse the JSON output into a Python dictionary
    data = json.loads(result.stdout)

    disks = [
                [
                    disk.get('name', 'N/A'), 
                    disk.get('size', 'N/A'),
                    disk.get('vendor', 'N/A'),
                    disk.get('model', 'N/A')
                ]
                for disk in data.get('blockdevices', []) if disk.get('rm', 'N/A') == True
            ]

    # Adding disk number into the table.
    for i in range(len(disks)):
        disks[i].insert(0, i + 1)

    # Mapping disk number with disk name.
    mapping = {}
    for disk in disks:
       mapping[disk[0]] = disk[1] # disk[0] represents disk number which is inserted explictly via above process and disk[1] represents disk name.

    # Checks if removable media is connected.
    if not disks:
        sys.exit("No USB/removable media found.")
    headers = [header.capitalize() for header in data.get("blockdevices")[0].keys()]
    headers.insert(0, "Disk Number") # Creating column for disk number.
    headers[3] = "Interface" # Renaming Vendor column to Interface
    print(tabulate(disks, headers=headers, tablefmt="grid"))

    try:
        return f"/dev/{mapping[int(input("Enter disk number: "))]}"
    except (ValueError, KeyError):
        sys.exit("Invalid disk number")

format_disk(disk, bios_type, size)

Formats the target disk, converts it into GPT or MBR then create partitions and filesystems compatible for the given BIOS Type i.e, UEFI or Legacy.

:param disk: Name of target disk. :type disk: str :param bios_type: BIOS firmware type. :type bios_type: str :param size: Size of the target disk required. :return: Dictionary of all the partition of target disk with corresponding labels. :rtype: str

Source code in cometusb/cometusb.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def format_disk(disk: str, bios_type: str, size: int) -> str:
    """
    Formats the target disk, converts it into GPT or MBR then create partitions and filesystems compatible for the given BIOS Type i.e, UEFI or Legacy. 

    :param disk: Name of target disk. 
    :type disk: str
    :param bios_type: BIOS firmware type.
    :type bios_type: str
    :param size: Size of the target disk required.
    :return: Dictionary of all the partition of target disk with corresponding labels.
    :rtype: str
    """
     # Confirming to Format the USB
    print(f"\n[!] MINIMUM {size} GB Disk is required.\n[*] This will ERASE all data on {disk}") 
    if input("Type 'yes' to continue: ").strip().lower() != "yes":
        sys.exit("Aborted by user.")

    partitions = glob.glob(disk + "?")
    unmount_usb(partitions)
    try:

        # Wipe partition table
        print(f"\n[*] Wiping disk {disk}")
        subprocess.run(["sudo", "wipefs", "-a", disk], check=True)

        print(f"\n[*] Creating partitions for {bios_type.upper()} systems...")
        if bios_type == "uefi":
            # Create new partition table and partition
            subprocess.run(["sudo", "parted", "-s", disk, "mklabel", "gpt"], check=True)
            subprocess.run(["sudo", "parted", "-s", disk, "mkpart", "primary", "1MiB", "501MiB"], check=True)
            partition = glob.glob(disk + "?")
            boot_partition = partition[0]
            subprocess.run(["sudo", "parted", "-s", disk, "mkpart", "primary", "501MiB", "100%"], check=True)
            partition = glob.glob(disk + "?")
            partition.remove(boot_partition)
            files_partition = partition[0]

            # Refreshing the partitions
            subprocess.run(["sudo", "partprobe", disk], check=True)
            subprocess.run(["sudo", "udevadm", "settle"], check=True)

            # Creating the filesystems
            print(f"\n[*] Creating filesystems ...")
            subprocess.run(["sudo", "mkfs.fat", "-F", "32", "-n", "COMET_BOOT", boot_partition], check=True)
            subprocess.run(["sudo", "parted", "-s", disk, "set", "1", "esp", "on"], check=True)    
            subprocess.run(["sudo", "mkfs.ntfs", "-f", files_partition, "-L", "COMET_FILES"], check=True)

        elif bios_type == "legacy":
            # Create new partition table and partition
            subprocess.run(["sudo", "parted", "-s", disk, "mklabel", "msdos"], check=True)
            subprocess.run(["sudo", "parted", "-s", disk, "mkpart", "primary", "0%", "100%"], check=True)
            partition = glob.glob(disk + "?")

            files_partition = partition[0] # Only one partition is here same for installation and boot files.

            # Refreshing the partitions
            subprocess.run(["sudo", "partprobe", disk], check=True)
            subprocess.run(["sudo", "udevadm", "settle"], check=True)

            # Creating the filesystems
            print(f"\n[*] Creating filesystems ...")

            subprocess.run(["sudo", "mkfs.ntfs", "-f", files_partition, "-L", "COMET"], check=True)
            subprocess.run(["sudo", "parted", "-s", disk, "set", "1", "boot", "on"], check=True) 

        print(f"[*] USB {disk} formatted successfully!\n")

    except subprocess.CalledProcessError:
        sys.exit("[*] Something went wrong, please retry.")

    if len(glob.glob(disk + "?")) == 2:
        return {"COMET_BOOT": boot_partition, "COMET_FILES": files_partition}
    else:
        return {"COMET": files_partition}

unmount_usb(partitions)

Unmounts all the existing partitions of the target disk to intitate the formatting process by format_disk function.

:param partitions: List of all the partitions in the target disk. :type partitions: list

Source code in cometusb/cometusb.py
360
361
362
363
364
365
366
367
368
369
370
def unmount_usb(partitions: list) -> None:
    """
    Unmounts all the existing partitions of the target disk to intitate the formatting process by format_disk function.

    :param partitions: List of all the partitions in the target disk.
    :type partitions: list
    """

    for part in partitions:
        print(f"Unmounting: {part}")
        subprocess.run(["sudo", "umount", "-f", part])

mount_usb(partitions)

Mounts all the newly created partitions of the target disk by format_disk function.

:param partitions: Dictionary of all the newly created partition of target disk with corresponding labels. :type partitions: dict

Source code in cometusb/cometusb.py
372
373
374
375
376
377
378
379
380
381
382
383
384
def mount_usb(partitions: dict) -> None:
    """
    Mounts all the newly created partitions of the target disk by format_disk function.

    :param partitions: Dictionary of all the newly created partition of target disk with corresponding labels.
    :type partitions: dict
    """
    for part_label in partitions.keys():
        print(f"Mounting: {partitions[part_label]} on /mnt/{part_label}")
        subprocess.run(["sudo", "mkdir", "-p", f"/mnt/{part_label}"])
        result = subprocess.run(["sudo", "mount", partitions[part_label], f"/mnt/{part_label}"])
        if result.returncode > 8:
            sys.exit(f"\n[*] Failed to mount {partitions[part_label]} on /mnt/{part_label}.\n[*] Please retry...")

downloader(url, download_dir)

Downloads all the files from a given URL.

:param url: Download URL of individual files. :type url: str :param download_dir: Download location of the given files. :type download_dir: str

Source code in cometusb/cometusb.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def downloader(url: str, download_dir: str) -> None:
    """
    Downloads all the files from a given URL.

    :param url: Download URL of individual files.
    :type url: str
    :param download_dir: Download location of the given files.
    :type download_dir: str
    """
    with requests.get(url, stream = True) as response:
        response.raise_for_status()
        total_size: int = int(response.headers.get("content-length", 0))
        chunk_size: int = 1024 * 200 # 200KB chunk for smoother progress update.
        with open(f"{download_dir}{os.path.basename(url)}", "wb") as file, tqdm(
            total = total_size,
            unit = "B",
            unit_scale = True,
            unit_divisor = 1024,
            desc = f"{os.path.basename(url)}"
        ) as progress:
            for chunk in response.iter_content(chunk_size = chunk_size):
                download = file.write(chunk)
                progress.update(download)

extractor(archive_path, extract_dir)

Extracts the compressed archive to given location.

:param archive_path: Complete path of the compressed archives. :type archive_path: str :param extract_dir: Extract folder for the corresponding archive. :type extract_dir: str

Source code in cometusb/cometusb.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def extractor(archive_path: str, extract_dir: str) -> None:
    """
    Extracts the compressed archive to given location.

    :param archive_path: Complete path of the compressed archives.
    :type archive_path: str
    :param extract_dir: Extract folder for the corresponding archive.
    :type extract_dir: str
    """
    with zf.ZipFile(archive_path, "r") as archive, tqdm(
        total = sum(file_info.file_size for file_info in archive.infolist()),
        unit = "B",
        unit_scale = True,
        unit_divisor = 1024,
        desc = f"Extracting {os.path.basename(archive_path)}"
    ) as progress:

        for file_info in archive.infolist():

            archive.extract(file_info, path = extract_dir)
            progress.update(file_info.file_size)