Page 3 of 6
A good tutorial providing fixes for printing of XML using Python and Minidom. Specifically fixing added whitespace and the conversion of characters into HTML entities.
The Python Subprocess module is a highly flexible way of running background processes in Python. Here are two functions that show how to run external shell commands, one will wait for the command to return and the other runs in the background allowing the controlling script to stop it as required.
Passing unsanitized input to these functions will not end well.
1 2 3 4 5 6 7 8 9 10 11 |
import subprocess def background_run_command(cmd): """Runs a bash command in the background, returns the Popen instance, does not block.""" return subprocess.Popen( [ cmd ], shell=True, executable='/bin/bash') def run_command(cmd): """ Runs an external command in bash, waits until the process has returned and returns output/ return code.""" proc = subprocess.Popen( [ cmd ], shell=True, executable='/bin/bash', stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) proc.wait() return (proc.returncode, proc.stdout) |
Monit is an excellent monitoring and process management tool, it has a huge array of features in its custom syntax, one option is to run programs as a specific user, for example –
1 2 3 4 5 6 7 |
check process tomcat with pidfile /var/run/tomcat.pid start program = "/etc/init.d/tomcat start" as uid nobody and gid nobody stop program = "/etc/init.d/tomcat stop" # You can also use id numbers instead and write: as uid 99 and with gid 99 if failed port 8080 then restart |
I recently had trouble getting Monit to correctly change the UID and GID of an application, there was very little error messages available but restarting it with the -v flag showed me ‘Failed to change UID’.
It took much guess work to find the solution to this problem which turned out to be that SELinux was enabled. Disabling SELinux in /etc/selinux/config solved it. I assume there could be some better way of configuring SELinux to handle Monit’s requests to change UID but as this is an internal test system it doesn’t matter.
Given multiple CSV files containing a timestamp column and some plottable values (int or float) this script will plot multiple columns of data by time.
Usage is as such – ./CSVPlot.py first.csv second.csv third.csv
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#!/usr/bin/python import numpy from matplotlib import pyplot as plt from datetime import datetime import sys timestamps = [] legend = [] for arg in sys.argv[1:]: data=numpy.genfromtxt(arg, delimiter=",", dtype=None, names=True) for columnName, dataType in data.dtype.descr: if not isinstance(data[columnName][0], basestring): if columnName.strip() != 'timestamp': timestamps = [datetime.fromtimestamp(x) for x in data['timestamp']] plt.plot(timestamps, data[columnName]) legend.append(columnName) plt.legend(legend, loc='upper right') #plt.savefig('test.png') plt.xticks(rotation=70) plt.show() |
A small thing that caught me out today, while writing a unit test for C# I wanted to check that a list was returned in the expected order and used Assert.AreEqual to compare them, this kept failing despite the two lists being exactly the same.
It turns out that there is a separate CollectionsAssert class that is to be used for lists or any form of C# collection and that is due to the List<T> class not overriding the Equals method, the AreEqual function will just check the references instead.
Here is an example of a working unit test comparing two lists.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
using System; using System.Collections.Generic; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace TestFunctions { [TestClass] public class SortedTupleDictionaryTest { [TestMethod] public void TestSortedDictionaryTupleOrder() { SortedDictionary<Tuple<int, int, int>, string> dict = new SortedDictionary<Tuple<int, int, int>, string>(); dict.Add(Tuple.Create(1, 1, 1), "apple"); dict.Add(Tuple.Create(1, 2, 1), "chair"); dict.Add(Tuple.Create(1, 1, 2), "bear"); dict.Add(Tuple.Create(2, 2, 1), "deer"); List<Tuple<int, int, int>> expected = new List<Tuple<int, int, int>>(); expected.Add(Tuple.Create(1, 1, 1)); expected.Add(Tuple.Create(1, 1, 2)); expected.Add(Tuple.Create(1, 2, 1)); expected.Add(Tuple.Create(2, 2, 1)); CollectionAssert.AreEqual(expected, dict.Keys); } } } |
An extension of the standard Python queue that only stores elements for a given number of seconds before being removed. Useful if you don’t know the volume of data being added to the queue but need to limit it in some way.
There is no decrease in performance when removing items.
Without item removal
Queue size: 10000 real 0m10.771s |
With item removal
Queue size: 4659 real 0m10.765s |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
#!/usr/bin/python """A Time bounded queue, can use any Queue class as a base as long as it follows the standard Python Queue format (http://docs.python.org/2/library/queue.html).""" from collections import deque from time import time from Queue import Queue class TimeBoundQueue(Queue): def __init__(self, max_time_seconds): '''Elements older than max_time_seconds will be removed from the queue.''' Queue.__init__(self) self.timeout = max_time_seconds self.timeout_count = 100 def _put(self, item): '''Add element to queue. Creates an internal tuple of (timestamp, element), also triggers removal of timed out elements every 100 adds.''' self.timeout_count -= 1 if self.timeout_count <= 0: self.__timeout() self.timeout_count = 100 elem = (time(), item) self.queue.append(elem) def _get(self): return self.queue.popleft()[1] def __timeout(self): '''Remove oldest elements up to time()-max_time_seconds.''' valid_timestamp = time() - self.timeout completed = False while not completed: try: timestamp, item = self.queue[0] if timestamp < valid_timestamp: # Quicker to pop than to remove a value at random position. self.queue.popleft() else: completed = True except IndexError as e: # No elements left return if __name__ == "__main__": from time import sleep t_queue = TimeBoundQueue(5) for i in range(0, 10000): t_queue.put(i) sleep(0.001) print "Queue size: {0}".format(t_queue.qsize()) |
Wrote a Python script that can dump a SQLite database to CSV, it takes the column names as the header and then writes values below. It also has a –sumarise option that can create a one line CSV file of multiple lines in a database.
Does have the option to support XML output in the future but a format would need to be defined by the user.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
#!/usr/bin/python import sqlite3 import csv from optparse import OptionParser class SQLiteDumper(): def __init__(self, database_path, statistic_type, output_file_name, output_format, summarise): self.databaseLocation = database_path self.statisticType = statistic_type self.outputFormat = output_format self.summarise = summarise self.outputFileName = output_file_name self.table = self.__typeToTable() self.__connectToDatabase() self.haveWrittenHeader = False self.notToSummarise = ['timestamp', 'component', 'some_string'] self.csvFile = None try: self.outputFile = open(self.outputFileName, "w+") if self.outputFormat == "CSV": self.csvFile = csv.writer(self.outputFile) except IOError as e: print e def __typeToTable(self): types = { "Views" : "views", "Errors" : "errors"} return types[self.statisticType] def __connectToDatabase(self): self.dbConnection = sqlite3.connect(self.databaseLocation, detect_types=sqlite3.PARSE_DECLTYPES) def writeToCSV(self, results): if not self.haveWrittenHeader: self.csvFile.writerow(results.keys()) self.haveWrittenHeader = True self.csvFile.writerow(results.values()) def writeToXML(self, results): # This requires a bit more clarification about the output format. raise NotImplementedError("Writing to XML support currently not available.") def __write(self, result): if self.outputFormat == "CSV": self.writeToCSV(result) elif self.outputFormat == "XML": self.writeToXML(result) def processDatabase(self): cursor = self.dbConnection.cursor() result = {} for row in cursor.execute('SELECT * FROM {0} ORDER BY timestamp ASC'.format(self.table)): for i, key in enumerate(cursor.description): if self.summarise: if result.has_key(key[0]) and key[0] not in self.notToSummarise: result[key[0]] += row[i] else: result[key[0]] = row[i] else: result[key[0]] = row[i] if not self.summarise: self.__write(result) if self.summarise: self.__write(result) def getOpts(): parser = OptionParser() parser.add_option("-o", "--outputFile", dest="output_file_name", action="store", default="output.csv", type="string", help="File name to store output information") parser.add_option("-t", "--statisticType", dest="statistic_type", action="store", default="Views", choices=["Views", "Errors"], help="Statistics type [Views, Errors]") parser.add_option("-d", "--database", dest="sqlite_db", action="store", type="string", help="Location of the SQLite database to read.") parser.add_option("-f", "--outputFormat", dest="output_format", action="store", default="CSV", choices=["XML", "CSV"], help="Output format [XML, CSV]") parser.add_option("-s", "--summarise", dest="summarise", action="store_true", default=False, help="Summarise all results into a single result.") (options, args) = parser.parse_args() if not options.sqlite_db: parser.error("--database argument is required.") return options if __name__ == "__main__": opts = getOpts() dumper = SQLiteDumper(opts.sqlite_db, opts.statistic_type, opts.output_file_name, opts.output_format, opts.summarise) try: dumper.processDatabase() except Exception as e: print e |
Unit testing is great, code coverage is great and continuous integration servers are great; but what isn’t great is combining all three into something useful!
If you are coding in Java then code coverage is fantastically simple but what if you use C++ and GoogleTest and want the same level of graphification? Luckily it is also simple if you know how – so you’re in the right place.
The scenario here is that you have a C++ project being built on a Jenkins/Hudson continuous integration setup and you also use Google Test for your unit tests. The software required for this is GCC 4.7 which should contain a tool called gcov, this will create the actual coverage files for your executable but will require some additional linker and library options to be added to your build scripts or make files. The final piece of software required is called gcovr, this is a Python script that is designed to read the output files from gcov linked executables and convert it into an XML format that can be read by the Cobertura plugin in Jenkins/Hudson.
It is a good idea to add a separate make target for coverage as it requires the executables to be built with no optimisation and a lot of debug symbols, so not very useful for production environments.
Add the following to your CPPFLAGS –
1 |
-fprofile-arcs -ftest-coverage -O0 |
And this to your LDFLAGS
1 |
-lgcov |
So for example you could have a section in your makefile that looks like this (assuming the test target builds your unit test.) –
1 2 3 |
coverage: CPPFLAGS += -fprofile-arcs -ftest-coverage -O0 coverage: LDFLAGS += -lgcov coverage: test |
After you have run the coverage build target you then have to run the test executable, this generates .gcov files that describe which lines of code are covered and by what functions.
Then ensure that you can access the gcovr script and run it with the following arguments –
1 2 3 |
./test chmod a+x ./gcovr ./gcovr --gcov-exclude=".*gtest.*" -x -d > coverage.xml |
The –gcov_exlcude will remove any unnecessary parsing of files from the GoogleTest framework. The output is placed into the coverage.xml file which can then be accessed from Jenkins by adding a new Post-Build Action from the job configuration page.
RPM packages are a great way of packaging and distributing software, unfortunately some stand alone software is distributed as a zip or tar file to be installed that way. By converting these into an RPM you can use your package manager as a distribution and removal tool. For this example we will be creating an RPM for the Example Software.
- The first thing to do is install the RPM development tools, you should only require rpmbuild.
- Create a folder to be used as a RPM development area and then create a file called .rpmmacros in your home folder that contains the following line: %_topdir <your build folder>
- Set up your RPM build area by creating the following folders – BUILD BUILDROOT RPMS SOURCES SPECS SRPMS
- Get a copy of your tar.gz or zipped software and place it in the SOURCES directory.
- Now the key part of this is to create the .spec file as this describes the build and installation part of the RPM.
- Create a new file in the SPECS folder called example.spec, fill it out with the following information –
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
%define _prefix /usr/bin/example # Your desired installation folder for the software Name: example Version: 1.0 Release: 1%{?dist} Summary: Example software for creating examples Group: Applications/System URL: http://www.example.com BuildRoot: %{_tmppath}/%{name} BuildArch: x86_64 License: Redistributable, no modification permitted Source: http://example.com/example-1.tar.gz # The web location for the source file, not required but the Name and Version values must match the file name. %description %prep %setup -q %build %install install -d %{buildroot}%{_prefix} # Create a directory for installation tar cf - . | (cd %{buildroot}%{_prefix}; tar xfp -) # rpmbuild will extract the tar.gz file, we must then copy the files to our build area and preserve file structure. %clean rm -rf %{buildroot}%{_prefix} %files %defattr(-,user,usergroup,-) # Modify for your requirements %dir %{_prefix} %{_prefix}/* %doc %changelog |
- After your spec file is created run rpmbuild -bb ./SPECS/example.spec – fix any errors that are produced, most likely problems will come from getting the correct version/name and target architecture.
- Find your completed RPM in ./RPMS/<ARCH>
Done!
I am lucky enough to be a programmer who doesn’t have to spend his whole life in an office churning out software, sometimes if the software isn’t good enough we get to venture out to sea and apply some frontline support!
I’ve been on several trips and they have been a great experience, if extremely tiring. Here’s a selection of photos from my most recent trip to the cold north (it’s actually very nice).
I got quite excited about the recent push into wearable computers and bought a Jawbone Up to keep track of everything I do. It is a neat system that tracks sleep and movement during the day, it also comes with an excellent mobile application for keep track of this data. Data is transferred from the band using the 3.5mm headphone jack which I thought was quite smart. Only problem? It doesn’t work with the HTC One X, pretty much the only phone it doesn’t work with.
I have tried pretty much everything to get it working and also got in touch with Jawbone to find out what was going on (complete lack of communication and information regarding this issue). After a month they decided to get back to me. The emails are at the bottom of this post.
It looks like I will have to replace either the phone or the band. I was looking at getting the Fitbit Flex as it transfers over Bluetooth which is a much better system. Although I’ve heard there are issues with various Android phones and that also!
|
|
© 2025 Acodemics
Theme by Anders Norén — Up ↑
Recent Comments