Skip to content

Commit

Permalink
Refactoring of gff2db
Browse files Browse the repository at this point in the history
* Improve gff2db to parse multiple files, and try multi-threading

* Add Util:getFieldsOfLine to skip over tabs

* Correctly deal with GFF3 column 2 that can contain whitespace

* Allow multiple GFF feature types in gff2db

* Make identifiers stable in gff2db

* Improved log output in gff2db

Co-authored-by: Milot Mirdita <milot@mirdita.de>
  • Loading branch information
RuoshiZhang and milot-mirdita committed Jul 5, 2021
1 parent d822533 commit 488df86
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 96 deletions.
4 changes: 2 additions & 2 deletions src/MMseqsBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1146,8 +1146,8 @@ std::vector<Command> baseCommands = {
"Extract regions from a sequence database based on a GFF3 file",
NULL,
"Milot Mirdita <milot@mirdita.de>",
"<i:gff3File> <i:sequenceDB> <o:sequenceDB>",
CITATION_MMSEQS2, {{"gff3File", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA, &DbValidator::flatfile },
"<i:gff3File1> ... <i:gff3FileN> <i:sequenceDB> <o:sequenceDB>",
CITATION_MMSEQS2, {{"gff3File", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA | DbType::VARIADIC, &DbValidator::flatfile },
{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA, &DbValidator::sequenceDb },
{"sequenceDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }}},
{"maskbygff", maskbygff, &par.gff2db, COMMAND_SPECIAL,
Expand Down
3 changes: 2 additions & 1 deletion src/commons/Parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ Parameters::Parameters():
PARAM_SEQUENCE_SPLIT_MODE(PARAM_SEQUENCE_SPLIT_MODE_ID, "--sequence-split-mode", "Sequence split mode", "Sequence split mode 0: copy data, 1: soft link data and write new index,", typeid(int), (void *) &sequenceSplitMode, "^[0-1]{1}$"),
PARAM_HEADER_SPLIT_MODE(PARAM_HEADER_SPLIT_MODE_ID, "--headers-split-mode", "Header split mode", "Header split mode: 0: split position, 1: original header", typeid(int), (void *) &headerSplitMode, "^[0-1]{1}$"),
// gff2db
PARAM_GFF_TYPE(PARAM_GFF_TYPE_ID, "--gff-type", "GFF type", "Type in the GFF file to filter by", typeid(std::string), (void *) &gffType, ""),
PARAM_GFF_TYPE(PARAM_GFF_TYPE_ID, "--gff-type", "GFF type", "Comma separated list of feature types in the GFF file to select", typeid(std::string), (void *) &gffType, ""),
// translatenucs
PARAM_TRANSLATION_TABLE(PARAM_TRANSLATION_TABLE_ID, "--translation-table", "Translation table", "1) CANONICAL, 2) VERT_MITOCHONDRIAL, 3) YEAST_MITOCHONDRIAL, 4) MOLD_MITOCHONDRIAL, 5) INVERT_MITOCHONDRIAL, 6) CILIATE\n9) FLATWORM_MITOCHONDRIAL, 10) EUPLOTID, 11) PROKARYOTE, 12) ALT_YEAST, 13) ASCIDIAN_MITOCHONDRIAL, 14) ALT_FLATWORM_MITOCHONDRIAL\n15) BLEPHARISMA, 16) CHLOROPHYCEAN_MITOCHONDRIAL, 21) TREMATODE_MITOCHONDRIAL, 22) SCENEDESMUS_MITOCHONDRIAL\n23) THRAUSTOCHYTRIUM_MITOCHONDRIAL, 24) PTEROBRANCHIA_MITOCHONDRIAL, 25) GRACILIBACTERIA, 26) PACHYSOLEN, 27) KARYORELICT, 28) CONDYLOSTOMA\n 29) MESODINIUM, 30) PERTRICH, 31) BLASTOCRITHIDIA", typeid(int), (void *) &translationTable, "^[1-9]{1}[0-9]*$", MMseqsParameter::COMMAND_MISC | MMseqsParameter::COMMAND_EXPERT),
// createseqfiledb
Expand Down Expand Up @@ -749,6 +749,7 @@ Parameters::Parameters():
// gff2db
gff2db.push_back(&PARAM_GFF_TYPE);
gff2db.push_back(&PARAM_ID_OFFSET);
gff2db.push_back(&PARAM_THREADS);
gff2db.push_back(&PARAM_V);
Expand Down
36 changes: 34 additions & 2 deletions src/commons/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,17 @@ class Util {

static bool getLine(const char* data, size_t dataLength, char* buffer, size_t bufferLength);

static inline size_t skipWhitespace(const char* data){
static inline size_t skipWhitespace(const char* data) {
size_t counter = 0;
while((data[counter] == ' ' || data[counter] == '\t') == true) {
while ((data[counter] == ' ' || data[counter] == '\t') == true) {
counter++;
}
return counter;
}

static inline size_t skipTab(const char* data) {
size_t counter = 0;
while (data[counter] == '\t') {
counter++;
}
return counter;
Expand Down Expand Up @@ -205,6 +213,14 @@ class Util {
return counter;
}

static inline size_t skipNonTab(const char * data) {
size_t counter = 0;
while (data[counter] != '\t' && data[counter] != '\n' && data[counter] != '\0') {
counter++;
}
return counter;
}

static std::pair<std::string, std::string> createTmpFileNames(const std::string &db,
const std::string &dbindex, int count){
// TODO take only db and not db and dbindex
Expand Down Expand Up @@ -245,6 +261,22 @@ class Util {
return elementCounter;
}

static inline size_t getFieldsOfLine(const char* data, const char** words, size_t maxElement) {
size_t elementCounter = 0;
while (*data != '\n' && *data != '\0') {
data += skipTab(data);
words[elementCounter] = data;
elementCounter++;
if (elementCounter >= maxElement)
return elementCounter;
data += skipNonTab(data);
}
if (elementCounter < maxElement)
words[elementCounter] = data;

return elementCounter;
}


static std::pair<ssize_t,ssize_t> getFastaHeaderPosition(const std::string & header);
static std::string parseFastaHeader(const char * header);
Expand Down
269 changes: 178 additions & 91 deletions src/util/gff2db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,118 +4,205 @@
#include "Util.h"
#include "MemoryMapped.h"
#include "Orf.h"
#include "Parameters.h"

#ifdef OPENMP
#include <omp.h>
#endif

int gff2db(int argc, const char **argv, const Command &command) {
Parameters &par = Parameters::getInstance();
par.parseParameters(argc, argv, command, true, 0, 0);
par.parseParameters(argc, argv, command, true, Parameters::PARSE_VARIADIC, 0);

MemoryMapped file(par.db1, MemoryMapped::WholeFile, MemoryMapped::SequentialScan);
if (!file.isValid()) {
Debug(Debug::ERROR) << "Could not open GFF file " << par.db1 << "\n";
EXIT(EXIT_FAILURE);
}
char *data = (char *) file.getData();
std::string outDb = par.filenames.back();
par.filenames.pop_back();
std::string seqDb = par.filenames.back();
par.filenames.pop_back();

DBReader<unsigned int> reader(par.db2.c_str(), par.db2Index.c_str(), 1, DBReader<unsigned int>::USE_INDEX | DBReader<unsigned int>::USE_DATA | DBReader<unsigned int>::USE_LOOKUP_REV);
DBReader<unsigned int> reader(seqDb.c_str(), (seqDb + ".index").c_str(), par.threads, DBReader<unsigned int>::USE_INDEX | DBReader<unsigned int>::USE_DATA | DBReader<unsigned int>::USE_LOOKUP_REV);
reader.open(DBReader<unsigned int>::NOSORT);
DBReader<unsigned int> headerReader(par.hdr2.c_str(), par.hdr2Index.c_str(), 1, DBReader<unsigned int>::USE_INDEX | DBReader<unsigned int>::USE_DATA);
DBReader<unsigned int> headerReader((seqDb + "_h").c_str(), (seqDb + "_h.index").c_str(), par.threads, DBReader<unsigned int>::USE_INDEX | DBReader<unsigned int>::USE_DATA);
headerReader.open(DBReader<unsigned int>::NOSORT);

DBWriter writer(par.db3.c_str(), par.db3Index.c_str(), 1, par.compressed, Parameters::DBTYPE_NUCLEOTIDES);
std::string outDbIndex = outDb + ".index";
DBWriter writer(outDb.c_str(), outDbIndex.c_str(), par.threads, par.compressed, Parameters::DBTYPE_NUCLEOTIDES);
writer.open();
DBWriter headerWriter(par.hdr3.c_str(), par.hdr3Index.c_str(), 1, par.compressed, Parameters::DBTYPE_GENERIC_DB);
std::string outHdr = outDb + "_h";
std::string outHdrIndex = outDb + "_h.index";
DBWriter headerWriter(outHdr.c_str(), outHdrIndex.c_str(), par.threads, par.compressed, Parameters::DBTYPE_GENERIC_DB);
headerWriter.open();

bool shouldCompareType = par.gffType.length() > 0;

Debug::Progress progress;
unsigned int entries_num = 0;
char buffer[1024];
const char* fields[255];

std::string revStr;
revStr.reserve(par.maxSeqLen + 1);

while (*data != '\0') {
progress.updateProgress();

// line is a comment
if (*data == '#') {
data = Util::skipLine(data);
continue;
}

const size_t columns = Util::getWordsOfLine(data, fields, 255);
data = Util::skipLine(data);
if (columns < 9) {
Debug(Debug::WARNING) << "Not enough columns in GFF file\n";
continue;
std::string outLookup = outDb + ".lookup";
std::string outLookupIndex = outDb + ".lookup.index";
DBWriter lookupWriter(outLookup.c_str(), outLookupIndex.c_str(), par.threads, 0, Parameters::DBTYPE_OMIT_FILE);
lookupWriter.open();

FILE *source = FileUtil::openAndDelete((outDb + ".source").c_str(), "w");
for (size_t i = 0; i < par.filenames.size(); ++i) {
if (fprintf(source, "%zu\t%s\n", i, FileUtil::baseName(par.filenames[i]).c_str()) < 0) {
Debug(Debug::ERROR) << "Cannot write to file " << outDb << ".source\n";
return EXIT_FAILURE;
}
}
if (fclose(source) != 0) {
Debug(Debug::ERROR) << "Cannot close file " << outDb << ".source\n";
return EXIT_FAILURE;
}

if (shouldCompareType) {
std::string type(fields[2], fields[3] - fields[2] - 1);
if (type.compare(par.gffType) != 0) {
continue;
}
}
const std::vector<std::string> features = Util::split(par.gffType, ",");
if (features.empty()) {
Debug(Debug::WARNING) << "No feature types given. All features will be extracted\n";
}
std::vector<size_t> featureCount(features.size(), 0);

size_t start = Util::fast_atoi<size_t>(fields[3]);
size_t end = Util::fast_atoi<size_t>(fields[4]);
if (start == end) {
Debug(Debug::WARNING) << "Invalid sequence length in line " << entries_num << "!\n";
continue;
}
if (par.filenames.size() < reader.getSize()) {
Debug(Debug::WARNING) << "Not enough GFF files are provided. Some results might be omitted\n";
}

std::string name(fields[0], fields[1] - fields[0] - 1);
size_t lookupId = reader.getLookupIdByAccession(name);
if (lookupId == SIZE_MAX) {
Debug(Debug::ERROR) << "GFF entry not found in database lookup: " << name << "!\n";
return EXIT_FAILURE;
unsigned int entries_num = 0;
Debug::Progress progress(par.filenames.size());
#pragma omp parallel
{
int thread_idx = 0;
#ifdef OPENMP
thread_idx = omp_get_thread_num();
#endif

char buffer[32768];
const char* fields[255];

std::string revStr;
revStr.reserve(par.maxSeqLen + 1);

std::vector<size_t> localFeatureCount(features.size(), 0);

#pragma omp for schedule(dynamic, 1) nowait
for (size_t i = 0; i < par.filenames.size(); ++i) {
progress.updateProgress();
MemoryMapped file(par.filenames[i], MemoryMapped::WholeFile, MemoryMapped::SequentialScan);
if (!file.isValid()) {
Debug(Debug::ERROR) << "Could not open GFF file " << par.filenames[i] << "\n";
EXIT(EXIT_FAILURE);
}
char *data = (char *) file.getData();
size_t idx = 0;
while (*data != '\0') {
// line is a comment or empty
if (*data == '#' || *data == '\n') {
data = Util::skipLine(data);
continue;
}

const size_t columns = Util::getFieldsOfLine(data, fields, 255);
data = Util::skipLine(data);
if (columns < 9) {
Debug(Debug::WARNING) << "Not enough columns in GFF file\n";
continue;
}

if (features.empty() == false) {
bool shouldSkip = true;
std::string type(fields[2], fields[3] - fields[2] - 1);
for (size_t i = 0; i < features.size(); ++i) {
if (type.compare(features[i]) == 0) {
localFeatureCount[i]++;
shouldSkip = false;
break;
}
}
if (shouldSkip) {
continue;
}
}
size_t start = Util::fast_atoi<size_t>(fields[3]);
size_t end = Util::fast_atoi<size_t>(fields[4]);
if (start == end) {
Debug(Debug::WARNING) << "Invalid sequence length in line " << idx << "\n";
continue;
}
std::string strand(fields[6], fields[7] - fields[6] - 1);
std::string name(fields[0], fields[1] - fields[0] - 1);
size_t lookupId = reader.getLookupIdByAccession(name);
if (lookupId == SIZE_MAX) {
Debug(Debug::ERROR) << "GFF entry not found in database lookup: " << name << "\n";
EXIT(EXIT_FAILURE);
}
unsigned int lookupKey = reader.getLookupKey(lookupId);
size_t seqId = reader.getId(lookupKey);
if (seqId == UINT_MAX) {
Debug(Debug::ERROR) << "GFF entry not found in sequence database: " << name << "\n";
EXIT(EXIT_FAILURE);
}

unsigned int key = __sync_fetch_and_add(&(entries_num), 1);
size_t bufferLen;
if (strand == "+") {
bufferLen = Orf::writeOrfHeader(buffer, lookupKey, start, end, 0, 0);
} else {
bufferLen = Orf::writeOrfHeader(buffer, lookupKey, end, start, 0, 0);
}
headerWriter.writeData(buffer, bufferLen, key, thread_idx);

char *seq = reader.getData(seqId, thread_idx);
//check the strand instead of end - start
ssize_t length = end - start + 1;
writer.writeStart(thread_idx);
if (strand == "+") {
size_t len = snprintf(buffer, sizeof(buffer), "%u\t%s_%zu_%zu_%zu\t%zu\n", key, name.c_str(), idx, start, end, i);
lookupWriter.writeData(buffer, len, thread_idx, false, false);
writer.writeAdd(seq + start - 1 , length, thread_idx);
} else {
size_t len = snprintf(buffer, sizeof(buffer), "%u\t%s_%zu_%zu_%zu\t%zu\n", key, name.c_str(), idx, end, start, i);
lookupWriter.writeData(buffer, len, thread_idx, false, false);
for (size_t i = end - 1 ; i >= end - length; i--) {
revStr.append(1, Orf::complement(seq[i]));
}
writer.writeAdd(revStr.c_str(), revStr.size(), thread_idx);
revStr.clear();
}
writer.writeAdd("\n", 1, thread_idx);
writer.writeEnd(key, thread_idx);
idx++;
}
file.close();
}
unsigned int key = reader.getLookupKey(lookupId);

size_t headerId = headerReader.getId(key);
if (headerId == UINT_MAX) {
Debug(Debug::ERROR) << "GFF entry not found in header database: " << name << "!\n";
return EXIT_FAILURE;
#pragma omp critical
for (size_t i = 0; i < features.size(); ++i) {
featureCount[i] += localFeatureCount[i];
}
unsigned int id = par.identifierOffset + entries_num;

headerWriter.writeStart(0);
headerWriter.writeAdd(headerReader.getData(headerId, 0), std::max(headerReader.getSeqLen(headerId), (size_t)2) - 2, 0);
int len = snprintf(buffer, 1024, " %zu-%zu\n", start, end);
headerWriter.writeAdd(buffer, len, 0);
headerWriter.writeEnd(id, 0);

size_t seqId = reader.getId(key);
if (seqId == UINT_MAX) {
Debug(Debug::ERROR) << "GFF entry not found in sequence database: " << name << "!\n";
return EXIT_FAILURE;
}
lookupWriter.close(true);
FileUtil::remove(lookupWriter.getIndexFileName());
headerWriter.close(true);
writer.close(true);
headerReader.close();
reader.close();
if (Debug::debugLevel >= Debug::INFO && features.size() > 0) {
Debug(Debug::INFO) << "Found these feature types and counts:\n";
for (size_t i = 0; i < features.size(); ++i) {
Debug(Debug::INFO) << " - " << features[i] << ": " << featureCount[i] << "\n";
}
} else {
Debug(Debug::INFO) << (entries_num + 1) << " features were extracted\n";
}

ssize_t length = end - start;
char *seq = reader.getData(seqId, 0);

writer.writeStart(0);
if (length > 0) {
writer.writeAdd(seq + start, length + 1, 0);
} else {
for (size_t i = start; i >= start + length; i--) {
revStr.append(1, Orf::complement(seq[i]));
if (par.filenames.size() > 1 && par.threads > 1) {
// make identifiers stable
#pragma omp parallel
{
#pragma omp single
{
#pragma omp task
{
DBWriter::createRenumberedDB(outHdr, outHdrIndex, "", "");
}

#pragma omp task
{
DBWriter::createRenumberedDB(outDb, outDbIndex, outDb, outDbIndex);
}
}
writer.writeAdd(revStr.c_str(), revStr.size(), 0);
revStr.clear();
}
writer.writeAdd("\n", 1, 0);
writer.writeEnd(id, 0);

entries_num++;
}
headerWriter.close();
writer.close();
headerReader.close();
reader.close();
file.close();

return EXIT_SUCCESS;
}
Expand Down

0 comments on commit 488df86

Please sign in to comment.