Une méthode d'import de données en Python

Où comment développer un ETL basique

Exemple avec CubicWeb

Yann Voté

PyConFr - Octobre 2015

Contexte et problème

Des outils existent déjà

Illustration Talend Open Studio

Principe d'un ETL

Extraction

->

Transformation

->

Chargement

Données sources

->

Données en transit

->

Données finales

La méthode

Un exemple pour illustrer

Bases des accidents corporels de la circulation

Num_Acc,an,mois,jour,col,com,dep
201400000001,14,5,7,3,590,011
201400000033,14,8,5,3,590,305

Rappels sur les générateurs

# List version
def ldivisors(n):
    divs = []
    for i in range(1, n + 1):
        if n % i == 0:
            divs.append(i)
    return divs
# Generator version
def divisors(n):
    for i in range(1, n + 1):
        if n % i == 0:
            yield i
>>> for div in ldivisors(6):
...     print(div, end=' ')
...
1 2 3 6
>>> for div in divisors(6):
...     print(div, end=' ')
...
1 2 3 6

Next

>>> divs6 = divisors(6)  # divisors code is not ran
>>> divs6
<generator object divisors at ...>
>>> next(divs6)
1
>>> s = 'Doing other stuff'
>>> next(divs6)
2
>>> s = 'Doing other stuff'
>>> next(divs6)
3

Générateurs et flux de données

Les générateurs sont souvent utiles dans des flux : chacun vient se « brancher » dans le flux pour transformer/filtrer des données (flow programming ; voir https://wiki.python.org/moin/FlowBasedProgramming)

# A generator
def do_something1(items):
    for item in items:
        <do something on item>
        yield item
# Another one
def do_something2(items):
    for item in items:
        <do other thing>
        yield item
>>> items = [...]
>>> items1 = do_something1(items)
>>> items2 = do_something2(items1)
>>> for item in items2:
...     final_processing(item)

Revenons à notre sujet

Premier principe : une classe unique pour les données en transit

« One class to rule them all »

class TransitObject(object):

    def __init__(self, t_id, dest, data=None):
        self.t_id = t_id
        self.dest = dest
        self.data = data or {}

    def __repr__(self):
        return '<{dest} {t_id} {data}>'.format(
            dest=self.dest, t_id=self.t_id, data=self.data)

En clair

Second principe : flux

En pratique

Imiter un ETL

Extraction

->

Transformation

->

Chargement

Données sources

->

TransitObject

->

Données finales

Extraction

Générer un flux de TransitObject

-- Rappel du fichier CSV
Num_Acc,an,mois,jour,col,com,dep
201400000001,14,5,7,3,590,011

---

def extract(filename, dest, t_id_col=None):
    with open(filename) as f:
        csvreader = csv.DictReader(f)
        for data in csvreader:
            t_id = data[t_id_col] if t_id_col else u''
            yield TransitObject(t_id=u'{0}{1}'.format(dest, t_id),
                                dest=dest, data=data)

---

>>> t_objs = extract('caracteristiques_2014.csv', dest='accident',
                     t_id_col='Num_Acc')
>>> t_obj = next(t_objs)
>>> t_obj
<accident accident201400000001 {'Num_Acc': '201400000001',
'jour': '7', 'dep': '590', 'mois': '5', 'an': '14', 'com': '011', 'col': '3'}>

Transformation

Utiliser des fonctions de signature TransitObject, * -> TransitObject

Exemple: ajout d'un attribut calculé

def t_add_expr_attribute(t_obj, attr_name, func, *attrs):
    new_t_obj = TransitObject(t_id=t_obj.t_id, dest=t_obj.dest,
                              data=t_obj.data.copy())
    args = [new_t_obj.data[attr] for attr in attrs]
    new_t_obj.data[attr_name] = func(*args)
    return new_t_obj

---

>>> t_add_expr_attribute(t_obj, 'date',
...     lambda x, y, z: datetime.date(int(x), int(y), int(x)),
...     'an', 'mois', 'jour')
<accident accident201400000001 {'jour': '7', 'dep': '590', 'col': '3',
'an': '14', 'date': datetime.date(14, 5, 7), 'mois': '5',
'com': '011', 'Num_Acc': '201400000001'}>

Transformation (autre exemple)

Renommer un attribut.

def t_rename_attribute(t_obj, old_name, new_name):
    new_t_obj = TransitObject(t_id=t_obj.t_id, dest=t_obj.dest,
                              data=t_obj.data.copy())
    value = new_t_obj.data.pop(old_name, None)
    if value is not None:
        new_t_obj.data[new_name] = value
    return new_t_obj

---

>>> t_rename_attribute(t_obj, 'Num_Acc', 'id')
<accident accident201400000001 {'jour': '7', 'dep': '590', 'col': '3',
'an': '14', 'date': datetime.date(14, 5, 7), 'mois': '5',
'com': '011', 'id': '201400000001'}>

Transformation (dernier exemple)

Supprimer des attributs.

def t_drop_attributes(t_obj, *attrs):
    new_t_obj = TransitObject(t_id=t_obj.t_id, dest=t_obj.dest,
                              data=t_obj.data.copy())
    for attr in attrs:
        new_t_obj.data.pop(attr)
    return new_t_obj

---

>>> t_drop_attributes(t_obj, 'jour', 'mois', 'an', 'dep', 'com', 'col')
<accident accident201400000001 {'date': datetime.date(14, 5, 7),
'id': '201400000001'}>

On rassemble

def transform(t_objs):
    for t_obj in t_objs:
        t_obj = t_add_expr_attribute(
            t_obj, 'date',
            lambda x, y, z: datetime.date(int(x), int(y), int(z)),
            'an',ql'mois', 'jour')
        t_obj = t_add_expr_attribute(
            t_obj, 'code_commune', code_commune, 'dep', 'com')
        t_obj = t_rename_attribute(t_obj, 'Num_Acc', 'id')
        type_id = t_obj.data['col']
        type_data = {'id': type_id, 'nom': types[type_id]}
        yield TransitObject(t_id=u'type{0}'.format(type_id),
                            dest='type_accident', data=type_data)
        t_obj.data['type_accident_id'] = type_id
        t_obj = t_drop_attributes(t_obj, 'jour', 'mois', 'an',
                                  'dep', 'com', 'col')
        yield t_obj

Chargement

Terminer le flux de TransitObject

def load(t_objs):
    conn = psycopg2.connect("dbname=...")
    cur = conn.cursor()
    for t_obj in t_objs:
        sql = 'INSERT INTO accident '
              '(id, date, type_accident_id, code_commune) VALUES '
              '(%(id)s, %(date)s, %(type_accident_id)s, %(code_commune)s)'
        if t_ojb.dest == 'type_accident':
            sql = 'INSERT INTO type_accident (id, nom) '
                  'VALUES (%(id)s, %(nom)s)'
        cur.execute(sql)
    conn.commit()
    cur.close()
    conn.close()

Workflow complet

def import(filename, dest, t_id_col=None):
    t_objs = extract(filename, dest=dest, t_id_col=t_id_col)
    t_objs = transform(t_objs)
    load(t_objs)

Et maintenant ?

Ce n'était qu'un teaser

Un coup d'œil sur les co-routines

Co-routine ?

def coroutine(items):
    for item in items:
        <do something on item>
        value = yield item
        <do something with value>

---

>>> l = [...]
>>> cor = coroutine(l)
>>> cor.next()  # or cor.send(None)
>>> x = 'value'
>>> item = cor.send(x)
>>> s = 'do something with item'
>>> x = 'other value'
>>> item = cor.send(x)

Co-routine pour fusionner

Références

Fin

Merci de votre attention

SpaceForward
Right, Down, Page DownNext slide
Left, Up, Page UpPrevious slide
POpen presenter console
HToggle this help