538 lines
21 KiB
Python
Executable File
538 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import numpy as np
|
|
from PIL import Image, ImageOps
|
|
import csv, sys, os, time
|
|
import inat_taxonomy
|
|
|
|
try:
|
|
# try importing TensorFlow Lite first
|
|
import tflite_runtime.interpreter as tflite
|
|
except Exception:
|
|
try:
|
|
# TensorFlow Lite not found, try to import full TensorFlow
|
|
import tensorflow.lite as tflite
|
|
except Exception:
|
|
print('Error: TensorFlow Lite could not be loaded.', file=sys.stderr)
|
|
print(' Follow instructions at https://www.tensorflow.org/lite/'
|
|
'guide/python to install it.', file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# The directory where this Python script is located.
|
|
INSTALL_DIR = inat_taxonomy.INSTALL_DIR
|
|
|
|
# This directory contains models, label files, and taxonomy files.
|
|
CLASSIFIER_DIRECTORY = os.path.join(INSTALL_DIR, 'classifiers')
|
|
|
|
# These flags can be modified with command-line options.
|
|
scientific_names_only = False # only scientific names or also common names
|
|
label_scores_only = False # scores for labels or hierarchical
|
|
all_common_names = False # show only one or all common names
|
|
result_sz = 5 # result size (for label_scores_only)
|
|
|
|
# This class is used by class Taxonomy.
|
|
class Taxon:
|
|
|
|
def __init__(self, taxon_id):
|
|
self.taxon_id = taxon_id # for internal lookups and iNat API calls
|
|
self.rank_level = None # taxonomic rank, e.g. species, genus, family
|
|
self.name = None # scientific name
|
|
self.common_name = None # common name or None
|
|
self.children = [] # list of child taxa
|
|
self.leaf_class_ids = [] # list of indices into scores; there
|
|
# can be more than one when we use old models
|
|
# whose taxa have since been lumped together
|
|
|
|
def add_child(self, child_taxon):
|
|
self.children.append(child_taxon)
|
|
|
|
# get taxonomic rank as a string
|
|
def get_rank(self):
|
|
if self.taxon_id < 0: # pseudo-kingdom?
|
|
assert self.rank_level == inat_taxonomy.KINGDOM_RANK_LEVEL
|
|
return ''
|
|
return inat_taxonomy.get_rank_name(self.rank_level)
|
|
|
|
# get the name to display; customize here to show common names differently
|
|
def get_name(self):
|
|
if self.common_name:
|
|
return f'{self.common_name} ({self.name})'
|
|
else:
|
|
return self.name
|
|
|
|
|
|
# This taxonomy is represented in terms of instances of class Taxon.
|
|
class Taxonomy:
|
|
|
|
def __init__(self):
|
|
# The taxonomy file may contain multiple trees, one for each kingdom.
|
|
# In order to have a single tree for prediction, we add a node for
|
|
# Life as the parent of all kingdoms. This will be the root of our tree.
|
|
self.root = Taxon(inat_taxonomy.ROOT_TAXON_ID)
|
|
self.root.name = inat_taxonomy.ROOT_NAME
|
|
self.root.rank_level = inat_taxonomy.ROOT_RANK_LEVEL
|
|
self.id2taxon = { self.root.taxon_id : self.root }
|
|
self.idx2label = {}
|
|
|
|
def reset(self):
|
|
self.root.children = []
|
|
self.id2taxon = { self.root.taxon_id : self.root }
|
|
self.idx2label = {}
|
|
|
|
def taxonomy_available(self):
|
|
return len(self.root.children) > 0
|
|
|
|
def read_taxonomy(self, filename):
|
|
start_time = time.time()
|
|
self.reset()
|
|
with open(filename, newline='', encoding='latin-1') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
for row in reader:
|
|
if 'id' in row: # this is a label file
|
|
self.idx2label[int(row['id'])] = row['name']
|
|
continue
|
|
|
|
taxon_id = int(row['taxon_id'])
|
|
if taxon_id in self.id2taxon:
|
|
taxon = self.id2taxon[taxon_id] # inserted earlier as parent
|
|
else:
|
|
self.id2taxon[taxon_id] = taxon = Taxon(taxon_id)
|
|
|
|
taxon.name = row['name']
|
|
if row['rank_level'].isdigit():
|
|
taxon.rank_level = int(row['rank_level'])
|
|
else:
|
|
taxon.rank_level = float(row['rank_level'])
|
|
|
|
if len(row['leaf_class_id']):
|
|
for leaf_class_id in row['leaf_class_id'].split(';'):
|
|
leaf_class_id = int(leaf_class_id)
|
|
taxon.leaf_class_ids.append(leaf_class_id)
|
|
self.idx2label[leaf_class_id] = taxon.name
|
|
|
|
if len(row['parent_taxon_id']):
|
|
parent_taxon_id = int(row['parent_taxon_id'])
|
|
else:
|
|
parent_taxon_id = self.root.taxon_id
|
|
if not parent_taxon_id in self.id2taxon:
|
|
self.id2taxon[parent_taxon_id] = Taxon(parent_taxon_id)
|
|
|
|
self.id2taxon[parent_taxon_id].add_child(taxon)
|
|
|
|
if not self.taxonomy_available():
|
|
# We parsed a label file; unless told otherwise, we use these
|
|
# labels to build a taxonomic tree.
|
|
print(f"Read {len(self.idx2label):,} labels from '{filename}' "
|
|
f"in {time.time() - start_time:.1f} secs.")
|
|
|
|
if not label_scores_only:
|
|
self.compute_taxonomic_tree()
|
|
if self.taxonomy_available():
|
|
self.write_taxonomic_tree(filename.replace('labelmap',
|
|
'taxonomy'))
|
|
else:
|
|
print(f"Read taxonomy from '{filename}' in "
|
|
f"{time.time() - start_time:.1f} secs: "
|
|
f"{len(self.id2taxon) - 1:,} taxa including "
|
|
f"{len(self.idx2label):,} leaf taxa.")
|
|
|
|
if not scientific_names_only and self.taxonomy_available():
|
|
inat_taxonomy.annotate_common_names(self.id2taxon, all_common_names)
|
|
if label_scores_only:
|
|
self.annotate_labels_with_common_names()
|
|
del self.id2taxon # not needed anymore
|
|
|
|
# augment labels with common names
|
|
def annotate_labels_with_common_names(self):
|
|
for taxon in self.id2taxon.values():
|
|
for leaf_class_id in taxon.leaf_class_ids:
|
|
self.idx2label[leaf_class_id] = taxon.get_name()
|
|
|
|
# write one row to taxonomy file
|
|
def write_row(self, writer, taxon, parent_taxon_id):
|
|
writer.writerow([parent_taxon_id, taxon.taxon_id, taxon.rank_level,
|
|
';'.join([str(id) for id in taxon.leaf_class_ids]),
|
|
taxon.name])
|
|
for child in taxon.children:
|
|
self.write_row(writer, child, taxon.taxon_id)
|
|
|
|
# write taxonomy file
|
|
def write_taxonomic_tree(self, filename):
|
|
try:
|
|
with open(filename, 'w', newline='', encoding='latin-1') as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(['parent_taxon_id', 'taxon_id', 'rank_level',
|
|
'leaf_class_id', 'name'])
|
|
for child in self.root.children:
|
|
self.write_row(writer, child, '')
|
|
print(f"Taxonomy written to file '{filename}'.")
|
|
except Exception as e:
|
|
print(f"Failure writing taxonomy to file '{filename}':", str(e))
|
|
try:
|
|
os.remove(filename)
|
|
except Exception:
|
|
pass
|
|
|
|
# Called after loading label file for Google's AIY Vision Kit.
|
|
# Adds all the labels' direct and indirect ancestors to compute
|
|
# the taxonomic tree.
|
|
def compute_taxonomic_tree(self):
|
|
global label_scores_only
|
|
if not inat_taxonomy.load_inat_taxonomy():
|
|
label_scores_only = True
|
|
return
|
|
|
|
start_time = time.time()
|
|
new_id = 0 # id's we add on the fly for pseudo-kingdoms
|
|
|
|
for idx, name in self.idx2label.items():
|
|
inat_taxa = inat_taxonomy.lookup_id(name)
|
|
if not inat_taxa:
|
|
print(f"Info: Taxon for label '{name}' not found, "
|
|
"inserting as pseudo-kingdom.")
|
|
new_id -= 1
|
|
taxon_id = new_id
|
|
self.id2taxon[taxon_id] = taxon = Taxon(taxon_id)
|
|
taxon.rank_level = inat_taxonomy.KINGDOM_RANK_LEVEL
|
|
taxon.name = name
|
|
taxon.leaf_class_ids = [idx]
|
|
self.root.add_child(taxon)
|
|
continue
|
|
|
|
inat_taxon, ancestors = inat_taxa
|
|
if name != inat_taxon.name:
|
|
print(f"Info: Taxon '{name}' changed to "
|
|
f"'{inat_taxon.name}', iNat taxa "
|
|
f"id {inat_taxon.id}.")
|
|
|
|
# ancestor taxa
|
|
prev_ancestor = self.root
|
|
for ancestor in ancestors:
|
|
if ancestor.id in self.id2taxon:
|
|
prev_ancestor = self.id2taxon[ancestor.id]
|
|
else:
|
|
self.id2taxon[ancestor.id] = ancestor_taxon = Taxon(ancestor.id)
|
|
ancestor_taxon.name = ancestor.name
|
|
ancestor_taxon.rank_level = ancestor.rank_level
|
|
prev_ancestor.add_child(ancestor_taxon)
|
|
prev_ancestor = ancestor_taxon
|
|
|
|
# this taxon
|
|
if inat_taxon.id in self.id2taxon:
|
|
taxon = self.id2taxon[inat_taxon.id]
|
|
assert taxon.name == inat_taxon.name
|
|
assert taxon.rank_level == inat_taxon.rank_level
|
|
else:
|
|
self.id2taxon[inat_taxon.id] = taxon = Taxon(inat_taxon.id)
|
|
taxon.name = inat_taxon.name
|
|
taxon.rank_level = inat_taxon.rank_level
|
|
prev_ancestor.add_child(taxon)
|
|
taxon.leaf_class_ids.append(idx)
|
|
|
|
print("Computed taxonomic tree from labels in "
|
|
f"{time.time() - start_time:.1f} secs: {len(self.id2taxon)-1:,} "
|
|
f"taxa including {len(self.idx2label):,} leaf taxa.")
|
|
|
|
# propagate scores to taxon and all below
|
|
def assign_scores(self, taxon, scores):
|
|
taxon.score = 0.0
|
|
for leaf_class_id in taxon.leaf_class_ids:
|
|
taxon.score += scores[leaf_class_id]
|
|
for child in taxon.children:
|
|
self.assign_scores(child, scores)
|
|
taxon.score += child.score
|
|
|
|
# Returns list of 5-tuples (score, taxon_id, taxonomic rank,
|
|
# scientific name, common name) ordered by taxonomic rank from kingdom
|
|
# down to e.g. species.
|
|
# Returns pairs (score, scientific name) if label_scores_only
|
|
# is set.
|
|
def prediction(self, scores):
|
|
|
|
if label_scores_only:
|
|
# return list of pairs (score, scientific name)
|
|
total = np.sum(scores)
|
|
indices = np.argpartition(scores, -result_sz)[-result_sz:]
|
|
results = [(scores[i] / total, self.idx2label[i])
|
|
for i in indices if scores[i] != 0]
|
|
results.sort(reverse=True)
|
|
return results
|
|
|
|
# annotate all taxa across the hierarchy with scores.
|
|
self.assign_scores(self.root, scores)
|
|
|
|
# return one hierarchical path guided by scores
|
|
path = []
|
|
taxon = self.root
|
|
while taxon.children:
|
|
# Find child with highest score.
|
|
best_child = None
|
|
for child in taxon.children:
|
|
if not best_child or child.score > best_child.score:
|
|
best_child = child
|
|
|
|
# Truncate path if all the other children combined are better
|
|
if best_child.score < 0.5 * taxon.score:
|
|
break
|
|
|
|
path.append((best_child.score / self.root.score,
|
|
best_child.taxon_id, best_child.get_rank(),
|
|
best_child.get_name()))
|
|
|
|
taxon = best_child
|
|
|
|
return path
|
|
|
|
#
|
|
# Offline image classification.
|
|
#
|
|
|
|
class OfflineClassifier:
|
|
|
|
def __init__(self, filenames):
|
|
self.min_pixel_value = 0.0
|
|
self.max_pixel_value = 255.0
|
|
|
|
if os.path.split(filenames[0])[1] in ['optimized_model.tflite',
|
|
'optimized_model_v1.tflite']:
|
|
self.min_pixel_value = -1.0
|
|
self.max_pixel_value = 1.0
|
|
|
|
# Load TFLite model and allocate tensors.
|
|
self.mInterpreter = tflite.Interpreter(model_path=filenames[0])
|
|
self.mInterpreter.allocate_tensors()
|
|
|
|
# Get input and output tensors.
|
|
self.mInput_details = self.mInterpreter.get_input_details()
|
|
self.mOutput_details = self.mInterpreter.get_output_details()
|
|
|
|
# Read labels or taxonomy
|
|
self.mTaxonomy = Taxonomy()
|
|
self.mTaxonomy.read_taxonomy(filenames[1])
|
|
|
|
def classify_image(self, image_filename):
|
|
start_time = time.time()
|
|
try:
|
|
img = Image.open(image_filename)
|
|
except:
|
|
print(f"Error: cannot load image '{image_filename}'.")
|
|
return []
|
|
|
|
if img.mode != 'RGB':
|
|
print(f"Error: image '{image_filename}' is of mode '{img.mode}',"
|
|
" only mode RGB is supported.")
|
|
return []
|
|
|
|
# rotate image if needed as it may contain EXIF orientation tag
|
|
img = ImageOps.exif_transpose(img)
|
|
|
|
model_size = tuple(self.mInput_details[0]['shape'][1:3])
|
|
|
|
# square target shape expected by crop code below
|
|
assert model_size[0] == model_size[1]
|
|
|
|
if img.size != model_size:
|
|
# We need to scale and maybe want to crop image.
|
|
width, height = img.size
|
|
if width != height:
|
|
# Before scaling, we crop image to square shape.
|
|
left = 0
|
|
right = width
|
|
top = 0
|
|
bottom = height
|
|
if width < height:
|
|
top = (height - width) / 2
|
|
bottom = top + width
|
|
else:
|
|
left = (width - height) / 2
|
|
right = left + height
|
|
img = img.crop((left, top, right, bottom))
|
|
|
|
# scale image
|
|
img = img.resize(model_size)
|
|
|
|
#img.show()
|
|
|
|
# pixels are in range 0 ... 255, turn into numpy array
|
|
input_data = np.array([np.array(img, self.mInput_details[0]['dtype'])])
|
|
|
|
if self.mInput_details[0]['dtype'] == np.float32:
|
|
input_data *= (self.max_pixel_value - self.min_pixel_value) / 255.0
|
|
input_data += self.min_pixel_value
|
|
|
|
self.mInterpreter.set_tensor(self.mInput_details[0]['index'],
|
|
input_data)
|
|
self.mInterpreter.invoke()
|
|
|
|
output_data = self.mInterpreter.get_tensor(self.mOutput_details[0]
|
|
['index'])
|
|
path = self.mTaxonomy.prediction(output_data[0])
|
|
print()
|
|
print(f"Classification of '{image_filename}' took "
|
|
f"{time.time() - start_time:.1f} secs.")
|
|
return path
|
|
|
|
# Returns a dictionary that maps available classifiers to a pair of filenames.
|
|
def get_installed_models():
|
|
|
|
if not os.path.isdir(CLASSIFIER_DIRECTORY):
|
|
print("Cannot load classifiers, directory "
|
|
f"'{CLASSIFIER_DIRECTORY}' does not exist.")
|
|
sys.exit(1)
|
|
|
|
choices = [ 'birds', 'insects', 'plants']
|
|
models = {}
|
|
|
|
for filename in os.listdir(CLASSIFIER_DIRECTORY):
|
|
model = None
|
|
if filename.endswith(".csv"):
|
|
if filename == 'taxonomy_v2_13.csv':
|
|
model = 'v2_13'
|
|
elif filename == 'taxonomy_v1.csv':
|
|
model = 'Seek'
|
|
else:
|
|
for m in choices:
|
|
if filename.find(m) != -1:
|
|
model = m
|
|
break
|
|
if model:
|
|
filename = os.path.join(CLASSIFIER_DIRECTORY, filename)
|
|
if model in models:
|
|
if not models[model][1] or models[model][1].\
|
|
endswith('labelmap.csv'):
|
|
models[model] = (models[model][0], filename)
|
|
else:
|
|
models[model] = (None, filename)
|
|
elif filename.endswith(".tflite"):
|
|
if filename == 'optimized_model_v2_13.tflite':
|
|
model = 'v2_13'
|
|
elif filename == 'optimized_model_v1.tflite':
|
|
model = 'Seek'
|
|
else:
|
|
for m in choices:
|
|
if filename.find(m) != -1:
|
|
model = m
|
|
break
|
|
if model:
|
|
filename = os.path.join(CLASSIFIER_DIRECTORY, filename)
|
|
if model in models:
|
|
models[model] = (filename, models[model][1])
|
|
else:
|
|
models[model] = (filename, None)
|
|
|
|
delete_elements = [] # postponed deletion, cannot delete during iteration
|
|
for name, files in models.items():
|
|
if not files[0] or not files[1]:
|
|
tf_missing = ".csv file but no .tflite file"
|
|
csv_missing = ".tflite file but no .csv file"
|
|
print("Installation issue: Excluding incomplete classifier for"
|
|
f" '{name}': {tf_missing if files[1] else csv_missing}.")
|
|
delete_elements.append(name)
|
|
|
|
for element in delete_elements:
|
|
del models[element]
|
|
|
|
if not models:
|
|
print(f"No classifiers found in directory '{CLASSIFIER_DIRECTORY}'; "
|
|
"follow instructions in "
|
|
f"'{os.path.join(CLASSIFIER_DIRECTORY,'README.md')}'"
|
|
" to install them.", file=sys.stderr)
|
|
sys.exit(1)
|
|
return models
|
|
|
|
def identify_species(classifier, filename):
|
|
result = classifier.classify_image(filename)
|
|
if result:
|
|
# Print list of tuples (score, taxon id, taxonomic rank, name)
|
|
# ordered by taxonomic rank from kingdom down to species.
|
|
for entry in result:
|
|
if len(entry) == 2: # labels only
|
|
print(f'{100 * entry[0]:5.1f}% {entry[1]}')
|
|
continue
|
|
print(f'{100 * entry[0]:5.1f}% {entry[2]:11s} {entry[3]}')
|
|
|
|
# command-line parsing
|
|
|
|
models = get_installed_models()
|
|
|
|
def model_parameter_check(arg):
|
|
if not arg in models:
|
|
msg = f"Model '{arg}' not available. Available "\
|
|
f"model{'' if len(models)==1 else 's'}:"
|
|
prefix = ' '
|
|
for m in models:
|
|
msg += f"{prefix}'{m}'"
|
|
prefix = ', '
|
|
msg += '.'
|
|
raise argparse.ArgumentTypeError(msg)
|
|
return arg
|
|
|
|
def result_size_check(arg):
|
|
if arg.isdigit() and int(arg) > 0 and int(arg) <= 100:
|
|
return int(arg)
|
|
raise argparse.ArgumentTypeError(f"'{arg}' is not a number "
|
|
"between 1 and 100.")
|
|
|
|
def file_directory_check(arg):
|
|
if os.path.isdir(arg) or os.path.isfile(arg):
|
|
return arg
|
|
raise argparse.ArgumentTypeError(f"'{arg}' is not a file or directory.")
|
|
|
|
#
|
|
# Identify species for picture files and directories given as command line args
|
|
#
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
|
|
preferred1 = 'v2_13' # default if this model is available
|
|
preferred2 = 'Seek' # second preference
|
|
|
|
parser = argparse.ArgumentParser()
|
|
if len(models) == 1 or preferred1 in models or preferred2 in models:
|
|
default_model = preferred1 if preferred1 in models else \
|
|
preferred2 if preferred2 in models else \
|
|
next(iter(models))
|
|
parser.add_argument("-m", "--model", type=model_parameter_check,
|
|
default=default_model,
|
|
help="Model to load to identify organisms.")
|
|
else: # no default for classification model
|
|
parser.add_argument("-m", "--model", type=model_parameter_check,
|
|
required=True,
|
|
help="Model to load to identify organisms.")
|
|
parser.add_argument('-a', '--all_common_names', action="store_true",
|
|
help='Show all common names and not just one.')
|
|
parser.add_argument('-l', '--label_scores_only', action="store_true",
|
|
help='Compute and display only label scores, '
|
|
'do not propagate scores up the hierarchy.')
|
|
parser.add_argument('-s', '--scientific_names_only', action="store_true",
|
|
help='Only use scientific names, do not load common '
|
|
'names.')
|
|
parser.add_argument('-r', '--result_size', type=result_size_check,
|
|
default=result_sz, help='Number of labels and their '
|
|
'scores to report in results.')
|
|
parser.add_argument('files_dirs', metavar='file/directory',
|
|
type=file_directory_check, nargs='+',
|
|
help='Image files or directories with images.')
|
|
args = parser.parse_args()
|
|
|
|
scientific_names_only = args.scientific_names_only
|
|
label_scores_only = args.label_scores_only
|
|
all_common_names = args.all_common_names
|
|
result_sz = args.result_size
|
|
|
|
# make classifier instance
|
|
|
|
classifier = OfflineClassifier(models[args.model])
|
|
|
|
# process photos
|
|
|
|
for arg in args.files_dirs:
|
|
if os.path.isfile(arg):
|
|
identify_species(classifier, arg)
|
|
elif os.path.isdir(arg):
|
|
for file in os.listdir(arg):
|
|
ext = os.path.splitext(file)[1].lower()
|
|
if ext in ['.jpg', '.jepg', '.png']:
|
|
identify_species(classifier, os.path.join(arg, file))
|